Rehabilitate queuer and syncer cli

This commit is contained in:
lash 2022-05-13 13:47:58 +00:00
parent acbbebd8da
commit 7c0df3b967
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 69 additions and 54 deletions

View File

@ -5,8 +5,23 @@ import signal
# external imports # external imports
import chainlib.eth.cli import chainlib.eth.cli
import chaind.cli from chainlib.eth.cli.arg import (
import chainqueue.cli Arg,
ArgFlag,
process_args,
)
from chainlib.eth.cli.config import (
Config,
process_config,
)
from chainqueue.cli.arg import (
apply_arg as apply_arg_queue,
apply_flag as apply_flag_queue,
)
from chaind.cli.arg import (
apply_arg,
apply_flag,
)
from chaind.session import SessionController from chaind.session import SessionController
from chaind.setup import Environment from chaind.setup import Environment
from chaind.error import ( from chaind.error import (
@ -29,11 +44,21 @@ from chainlib.encode import TxHexNormalizer
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chaind.adapters.fs import ChaindFsAdapter from chaind.adapters.fs import ChaindFsAdapter
from chaind.dispatch import DispatchProcessor from chaind.dispatch import DispatchProcessor
from chainqueue.data import config_dir as chainqueue_config_dir
from chaind.data import config_dir as chaind_config_dir
from chainlib.eth.cli.log import process_log
# local imports # local imports
from chaind.eth.cache import EthCacheTx from chaind.eth.cache import EthCacheTx
from chaind.eth.settings import ChaindEthSettings from chaind.eth.settings import ChaindSettings
from chaind.eth.dispatch import EthDispatcher from chaind.eth.dispatch import EthDispatcher
from chaind.eth.settings import process_settings
from chaind.settings import (
process_queue,
process_socket,
process_dispatch,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -43,37 +68,41 @@ config_dir = os.path.join(script_dir, '..', 'data', 'config')
env = Environment(domain='eth', env=os.environ) env = Environment(domain='eth', env=os.environ)
arg_flags = chainlib.eth.cli.argflag_std_read arg_flags = ArgFlag()
argparser = chainlib.eth.cli.ArgumentParser(arg_flags) arg_flags = apply_flag_queue(arg_flags)
arg_flags = apply_flag(arg_flags)
queue_arg_flags = 0 arg = Arg(arg_flags)
chainqueue.cli.process_flags(argparser, queue_arg_flags) arg = apply_arg_queue(arg)
arg = apply_arg(arg)
local_arg_flags = chaind.cli.argflag_local_base | chaind.cli.ChaindFlag.DISPATCH | chaind.cli.ChaindFlag.SOCKET flags = arg_flags.STD_READ | arg_flags.QUEUE | arg_flags.STATE
chaind.cli.process_flags(argparser, local_arg_flags)
argparser = chainlib.eth.cli.ArgumentParser()
argparser = process_args(argparser, arg, flags)
args = argparser.parse_args() args = argparser.parse_args()
base_config_dir = [chainqueue.cli.config_dir, chaind.cli.config_dir] logg = process_log(args, logg)
config = chainlib.eth.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chaind.cli.process_config(config, args, local_arg_flags) config = Config()
config = chainqueue.cli.process_config(config, args, queue_arg_flags) config.add_schema_dir(chainqueue_config_dir)
config.add_schema_dir(chaind_config_dir)
config = process_config(config, arg, args, flags)
config.add('eth', 'CHAIND_ENGINE', False) config.add('eth', 'CHAIND_ENGINE', False)
config.add('queue', 'CHAIND_COMPONENT', False) config.add('sync', 'CHAIND_COMPONENT', False)
logg.debug('config loaded:\n{}'.format(config)) logg.debug('config loaded:\n{}'.format(config))
settings = ChaindEthSettings(include_queue=True) settings = ChaindSettings(include_sync=True)
settings.process(config) settings = process_settings(settings, config)
settings = process_queue(settings, config)
logg.debug('settings:\n{}'.format(settings)) settings = process_socket(settings, config)
settings = process_dispatch(settings, config)
rpc = chainlib.eth.cli.Rpc() logg.debug('settings loaded:\n{}'.format(settings))
conn = rpc.connect_by_config(config)
tx_normalizer = TxHexNormalizer().tx_hash tx_normalizer = TxHexNormalizer().tx_hash
token_cache_store = CacheTokenTx(settings.get('CHAIN_SPEC'), normalizer=tx_normalizer) token_cache_store = CacheTokenTx(settings.get('CHAIN_SPEC'), normalizer=tx_normalizer)
dispatcher = EthDispatcher(conn) dispatcher = EthDispatcher(settings.get('CONN'))
processor = DispatchProcessor(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), dispatcher) processor = DispatchProcessor(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), dispatcher)
ctrl = SessionController(settings, processor.process) ctrl = SessionController(settings, processor.process)
@ -110,7 +139,7 @@ def main():
pass pass
if v == None: if v == None:
ctrl.process(conn) ctrl.process(settings.get('CONN'))
#queue_adapter = create_adapter(settings, dispatcher) #queue_adapter = create_adapter(settings, dispatcher)
continue continue

View File

@ -3,10 +3,7 @@ import os
import logging import logging
# external imports # external imports
import chainlib.cli
import chainlib.eth.cli import chainlib.eth.cli
import chainsyncer.cli
import chaind.cli
from chaind.setup import Environment from chaind.setup import Environment
from chaind.filter import StateFilter from chaind.filter import StateFilter
from chainlib.eth.block import block_latest from chainlib.eth.block import block_latest
@ -38,7 +35,10 @@ from chaind.settings import ChaindSettings
# local imports # local imports
from chaind.eth.cache import EthCacheTx from chaind.eth.cache import EthCacheTx
from chaind.eth.settings import process_settings from chaind.eth.settings import (
process_settings,
process_sync,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
@ -57,7 +57,7 @@ arg = Arg(arg_flags)
arg = apply_arg_sync(arg) arg = apply_arg_sync(arg)
arg = apply_arg(arg) arg = apply_arg(arg)
flags = arg_flags.STD_BASE | arg_flags.CHAIN_SPEC | arg_flags.PROVIDER flags = arg_flags.STD_BASE | arg_flags.CHAIN_SPEC | arg_flags.PROVIDER | arg_flags.SEQ | arg_flags.STATE
flags = arg_flags.more(flags, arg_flags.SYNC_RANGE_EXT) flags = arg_flags.more(flags, arg_flags.SYNC_RANGE_EXT)
flags = arg_flags.more(flags, arg_flags.CHAIND_BASE) flags = arg_flags.more(flags, arg_flags.CHAIND_BASE)
@ -71,27 +71,26 @@ config = Config()
config.add_schema_dir(chainsyncer_config_dir) config.add_schema_dir(chainsyncer_config_dir)
config.add_schema_dir(chaind_config_dir) config.add_schema_dir(chaind_config_dir)
config = process_config(config, arg, args, flags) config = process_config(config, arg, args, flags)
#config = process_config_local(config, arg, args, flags)
config.add('eth', 'CHAIND_ENGINE', False) config.add('eth', 'CHAIND_ENGINE', False)
config.add('sync', 'CHAIND_COMPONENT', False) config.add('sync', 'CHAIND_COMPONENT', False)
logg.debug('config loaded:\n{}'.format(config)) logg.debug('config loaded:\n{}'.format(config))
settings = ChaindSettings() settings = ChaindSettings(include_sync=True)
settings = process_settings(settings, config) settings = process_settings(settings, config)
settings = process_sync(settings, config)
logg.debug('settings loaded:\n{}'.format(settings)) logg.debug('settings loaded:\n{}'.format(settings))
sys.exit(0)
def main(): def main():
fltr = StateFilter(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), EthCacheTx) fltr = StateFilter(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), EthCacheTx)
sync_store = SyncFsStore(settings.get('SESSION_DATA_DIR'), session_id=settings.get('SESSION_ID')) sync_store = SyncFsStore(settings.get('SESSION_DATA_PATH'), session_id=settings.get('SESSION_ID'))
sync_store.register(fltr) sync_store.register(fltr)
logg.debug('session block offset {}'.format(settings.get('SYNCER_OFFSET'))) logg.debug('session block offset {}'.format(settings.get('SYNCER_OFFSET')))
drv = ChainInterfaceDriver(sync_store, settings.get('SYNCER_INTERFACE'), offset=settings.get('SYNCER_OFFSET'), target=settings.get('SYNCER_LIMIT')) drv = ChainInterfaceDriver(sync_store, settings.get('SYNCER_INTERFACE'), offset=settings.get('SYNCER_OFFSET'), target=settings.get('SYNCER_LIMIT'))
try: try:
drv.run(settings.get('RPC')) drv.run(settings.get('CONN'))
except SyncDone as e: except SyncDone as e:
logg.info('sync done: {}'.format(e)) logg.info('sync done: {}'.format(e))

View File

@ -1,13 +1,9 @@
# external imports # external imports
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from chainlib.settings import process_settings as base_process_settings from chainlib.eth.settings import process_settings as base_process_settings
from chaind.eth.chain import EthChainInterface from chaind.eth.chain import EthChainInterface
from chaind.settings import * from chaind.settings import *
from chainsyncer.settings import process_sync_range
def process_sync_interface(settings, config):
settings.set('SYNCER_INTERFACE', EthChainInterface())
return settings
def process_common(settings, config): def process_common(settings, config):
@ -19,26 +15,17 @@ def process_common(settings, config):
return settings return settings
def process_sync(settings, config):
settings.set('SYNCER_INTERFACE', EthChainInterface())
settings = process_sync_range(settings, config)
return settings
def process_settings(settings, config): def process_settings(settings, config):
settings = base_process_settings(settings, config) settings = base_process_settings(settings, config)
settings = process_common(settings, config) settings = process_common(settings, config)
settings = process_sync_interface(settings, config)
if settings.include_queue:
settings = process_queue_backend(settings, config)
if settings.include_sync:
settings = process_sync_backend(settings, config)
settings = process_backend(settings, config) settings = process_backend(settings, config)
settings = process_session(settings, config) settings = process_session(settings, config)
if settings.include_sync:
settings = process_sync(settings, config)
if settings.include_queue:
settings = process_chaind_queue(settings, config)
settings = process_dispatch(settings, config)
settings = process_token(settings, config)
settings = process_socket(settings, config) settings = process_socket(settings, config)
return settings return settings