From 7c0df3b9675f54b02a7fc2b8ab02c02a1eb441d2 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 13 May 2022 13:47:58 +0000 Subject: [PATCH] Rehabilitate queuer and syncer cli --- chaind/eth/runnable/queuer.py | 75 ++++++++++++++++++++++++----------- chaind/eth/runnable/syncer.py | 19 +++++---- chaind/eth/settings.py | 29 ++++---------- 3 files changed, 69 insertions(+), 54 deletions(-) diff --git a/chaind/eth/runnable/queuer.py b/chaind/eth/runnable/queuer.py index ecc2c26..5833de5 100644 --- a/chaind/eth/runnable/queuer.py +++ b/chaind/eth/runnable/queuer.py @@ -5,8 +5,23 @@ import signal # external imports import chainlib.eth.cli -import chaind.cli -import chainqueue.cli +from chainlib.eth.cli.arg import ( + Arg, + ArgFlag, + process_args, + ) +from chainlib.eth.cli.config import ( + Config, + process_config, + ) +from chainqueue.cli.arg import ( + apply_arg as apply_arg_queue, + apply_flag as apply_flag_queue, + ) +from chaind.cli.arg import ( + apply_arg, + apply_flag, + ) from chaind.session import SessionController from chaind.setup import Environment from chaind.error import ( @@ -29,11 +44,21 @@ from chainlib.encode import TxHexNormalizer from chainlib.chain import ChainSpec from chaind.adapters.fs import ChaindFsAdapter from chaind.dispatch import DispatchProcessor +from chainqueue.data import config_dir as chainqueue_config_dir +from chaind.data import config_dir as chaind_config_dir +from chainlib.eth.cli.log import process_log # local imports from chaind.eth.cache import EthCacheTx -from chaind.eth.settings import ChaindEthSettings +from chaind.eth.settings import ChaindSettings from chaind.eth.dispatch import EthDispatcher +from chaind.eth.settings import process_settings +from chaind.settings import ( + process_queue, + process_socket, + process_dispatch, + ) + logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -43,37 +68,41 @@ config_dir = os.path.join(script_dir, '..', 'data', 'config') env = Environment(domain='eth', env=os.environ) -arg_flags = chainlib.eth.cli.argflag_std_read -argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +arg_flags = ArgFlag() +arg_flags = apply_flag_queue(arg_flags) +arg_flags = apply_flag(arg_flags) -queue_arg_flags = 0 -chainqueue.cli.process_flags(argparser, queue_arg_flags) +arg = Arg(arg_flags) +arg = apply_arg_queue(arg) +arg = apply_arg(arg) -local_arg_flags = chaind.cli.argflag_local_base | chaind.cli.ChaindFlag.DISPATCH | chaind.cli.ChaindFlag.SOCKET -chaind.cli.process_flags(argparser, local_arg_flags) +flags = arg_flags.STD_READ | arg_flags.QUEUE | arg_flags.STATE +argparser = chainlib.eth.cli.ArgumentParser() +argparser = process_args(argparser, arg, flags) args = argparser.parse_args() -base_config_dir = [chainqueue.cli.config_dir, chaind.cli.config_dir] -config = chainlib.eth.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir) -config = chaind.cli.process_config(config, args, local_arg_flags) -config = chainqueue.cli.process_config(config, args, queue_arg_flags) +logg = process_log(args, logg) + +config = Config() +config.add_schema_dir(chainqueue_config_dir) +config.add_schema_dir(chaind_config_dir) +config = process_config(config, arg, args, flags) config.add('eth', 'CHAIND_ENGINE', False) -config.add('queue', 'CHAIND_COMPONENT', False) +config.add('sync', 'CHAIND_COMPONENT', False) logg.debug('config loaded:\n{}'.format(config)) -settings = ChaindEthSettings(include_queue=True) -settings.process(config) - -logg.debug('settings:\n{}'.format(settings)) - -rpc = chainlib.eth.cli.Rpc() -conn = rpc.connect_by_config(config) +settings = ChaindSettings(include_sync=True) +settings = process_settings(settings, config) +settings = process_queue(settings, config) +settings = process_socket(settings, config) +settings = process_dispatch(settings, config) +logg.debug('settings loaded:\n{}'.format(settings)) tx_normalizer = TxHexNormalizer().tx_hash token_cache_store = CacheTokenTx(settings.get('CHAIN_SPEC'), normalizer=tx_normalizer) -dispatcher = EthDispatcher(conn) +dispatcher = EthDispatcher(settings.get('CONN')) processor = DispatchProcessor(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), dispatcher) ctrl = SessionController(settings, processor.process) @@ -110,7 +139,7 @@ def main(): pass if v == None: - ctrl.process(conn) + ctrl.process(settings.get('CONN')) #queue_adapter = create_adapter(settings, dispatcher) continue diff --git a/chaind/eth/runnable/syncer.py b/chaind/eth/runnable/syncer.py index 868d461..5291864 100644 --- a/chaind/eth/runnable/syncer.py +++ b/chaind/eth/runnable/syncer.py @@ -3,10 +3,7 @@ import os import logging # external imports -import chainlib.cli import chainlib.eth.cli -import chainsyncer.cli -import chaind.cli from chaind.setup import Environment from chaind.filter import StateFilter from chainlib.eth.block import block_latest @@ -38,7 +35,10 @@ from chaind.settings import ChaindSettings # local imports from chaind.eth.cache import EthCacheTx -from chaind.eth.settings import process_settings +from chaind.eth.settings import ( + process_settings, + process_sync, + ) logging.basicConfig(level=logging.WARNING) @@ -57,7 +57,7 @@ arg = Arg(arg_flags) arg = apply_arg_sync(arg) arg = apply_arg(arg) -flags = arg_flags.STD_BASE | arg_flags.CHAIN_SPEC | arg_flags.PROVIDER +flags = arg_flags.STD_BASE | arg_flags.CHAIN_SPEC | arg_flags.PROVIDER | arg_flags.SEQ | arg_flags.STATE flags = arg_flags.more(flags, arg_flags.SYNC_RANGE_EXT) flags = arg_flags.more(flags, arg_flags.CHAIND_BASE) @@ -71,27 +71,26 @@ config = Config() config.add_schema_dir(chainsyncer_config_dir) config.add_schema_dir(chaind_config_dir) config = process_config(config, arg, args, flags) -#config = process_config_local(config, arg, args, flags) config.add('eth', 'CHAIND_ENGINE', False) config.add('sync', 'CHAIND_COMPONENT', False) logg.debug('config loaded:\n{}'.format(config)) -settings = ChaindSettings() +settings = ChaindSettings(include_sync=True) settings = process_settings(settings, config) +settings = process_sync(settings, config) logg.debug('settings loaded:\n{}'.format(settings)) -sys.exit(0) def main(): fltr = StateFilter(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), EthCacheTx) - sync_store = SyncFsStore(settings.get('SESSION_DATA_DIR'), session_id=settings.get('SESSION_ID')) + sync_store = SyncFsStore(settings.get('SESSION_DATA_PATH'), session_id=settings.get('SESSION_ID')) sync_store.register(fltr) logg.debug('session block offset {}'.format(settings.get('SYNCER_OFFSET'))) drv = ChainInterfaceDriver(sync_store, settings.get('SYNCER_INTERFACE'), offset=settings.get('SYNCER_OFFSET'), target=settings.get('SYNCER_LIMIT')) try: - drv.run(settings.get('RPC')) + drv.run(settings.get('CONN')) except SyncDone as e: logg.info('sync done: {}'.format(e)) diff --git a/chaind/eth/settings.py b/chaind/eth/settings.py index 577ebaf..3324bcb 100644 --- a/chaind/eth/settings.py +++ b/chaind/eth/settings.py @@ -1,13 +1,9 @@ # external imports from chainlib.eth.connection import EthHTTPConnection -from chainlib.settings import process_settings as base_process_settings +from chainlib.eth.settings import process_settings as base_process_settings from chaind.eth.chain import EthChainInterface from chaind.settings import * - - -def process_sync_interface(settings, config): - settings.set('SYNCER_INTERFACE', EthChainInterface()) - return settings +from chainsyncer.settings import process_sync_range def process_common(settings, config): @@ -19,26 +15,17 @@ def process_common(settings, config): return settings +def process_sync(settings, config): + settings.set('SYNCER_INTERFACE', EthChainInterface()) + settings = process_sync_range(settings, config) + return settings + + def process_settings(settings, config): settings = base_process_settings(settings, config) settings = process_common(settings, config) - settings = process_sync_interface(settings, config) - - if settings.include_queue: - settings = process_queue_backend(settings, config) - if settings.include_sync: - settings = process_sync_backend(settings, config) - settings = process_backend(settings, config) settings = process_session(settings, config) - - if settings.include_sync: - settings = process_sync(settings, config) - if settings.include_queue: - settings = process_chaind_queue(settings, config) - settings = process_dispatch(settings, config) - settings = process_token(settings, config) - settings = process_socket(settings, config) return settings