From 1bc6c5ace587f5565efb485cc10e8616e7016488 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 24 Jul 2021 17:28:03 +0200 Subject: [PATCH] Implement chainlib cli util for tasker, retry, dispatcher --- apps/cic-eth/cic_eth/check/gas.py | 2 +- apps/cic-eth/cic_eth/check/signer.py | 2 +- apps/cic-eth/cic_eth/cli/rpc.py | 53 +++++- apps/cic-eth/cic_eth/data/config/cic.ini | 2 + .../cic_eth/runnable/daemons/dispatcher.py | 54 ++---- .../cic-eth/cic_eth/runnable/daemons/retry.py | 67 +++---- .../cic_eth/runnable/daemons/tasker.py | 164 +++++------------- .../cic_eth/runnable/daemons/tracker.py | 2 - apps/cic-eth/cic_eth/sync/retry.py | 6 +- 9 files changed, 139 insertions(+), 213 deletions(-) diff --git a/apps/cic-eth/cic_eth/check/gas.py b/apps/cic-eth/cic_eth/check/gas.py index b1646fd6..4add981b 100644 --- a/apps/cic-eth/cic_eth/check/gas.py +++ b/apps/cic-eth/cic_eth/check/gas.py @@ -21,7 +21,7 @@ def health(*args, **kwargs): session = SessionBase.create_session() config = kwargs['config'] - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) + chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec)) try: diff --git a/apps/cic-eth/cic_eth/check/signer.py b/apps/cic-eth/cic_eth/check/signer.py index 87b7785f..0395f0a1 100644 --- a/apps/cic-eth/cic_eth/check/signer.py +++ b/apps/cic-eth/cic_eth/check/signer.py @@ -15,7 +15,7 @@ logg = logging.getLogger().getChild(__name__) def health(*args, **kwargs): blocked = True max_attempts = 5 - conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer') + conn = RPCConnection.connect(kwargs['config'].get('CHAIN_SPEC'), tag='signer') for i in range(max_attempts): idx = i + 1 logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts)) diff --git a/apps/cic-eth/cic_eth/cli/rpc.py b/apps/cic-eth/cic_eth/cli/rpc.py index 7f6500ae..cc073767 100644 --- a/apps/cic-eth/cic_eth/cli/rpc.py +++ b/apps/cic-eth/cic_eth/cli/rpc.py @@ -6,7 +6,10 @@ from chainlib.connection import ( RPCConnection, ConnType, ) -from chainlib.eth.connection import EthUnixSignerConnection +from chainlib.eth.connection import ( + EthUnixSignerConnection, + EthHTTPSignerConnection, + ) from chainlib.chain import ChainSpec logg = logging.getLogger(__name__) @@ -25,11 +28,15 @@ class RPC: @staticmethod - def from_config(config): + def from_config(config, use_signer=False): chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + RPCConnection.register_location(config.get('RPC_HTTP_PROVIDER'), chain_spec, 'default') - if config.get('SIGNER_PROVIDER'): - RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer') + if use_signer: + + RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') + RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') + RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer') rpc = RPC(chain_spec, config.get('RPC_HTTP_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER')) logg.info('set up rpc: {}'.format(rpc)) @@ -40,4 +47,40 @@ class RPC: return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider) - +# TOOD: re-implement file backend option from omittec code: +#broker = config.get('CELERY_BROKER_URL') +#if broker[:4] == 'file': +# bq = tempfile.mkdtemp() +# bp = tempfile.mkdtemp() +# conf_update = { +# 'broker_url': broker, +# 'broker_transport_options': { +# 'data_folder_in': bq, +# 'data_folder_out': bq, +# 'data_folder_processed': bp, +# }, +# } +# if config.true('CELERY_DEBUG'): +# conf_update['result_extended'] = True +# current_app.conf.update(conf_update) +# logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp)) +#else: +# conf_update = { +# 'broker_url': broker, +# } +# if config.true('CELERY_DEBUG'): +# conf_update['result_extended'] = True +# current_app.conf.update(conf_update) +# +#result = config.get('CELERY_RESULT_URL') +#if result[:4] == 'file': +# rq = tempfile.mkdtemp() +# current_app.conf.update({ +# 'result_backend': 'file://{}'.format(rq), +# }) +# logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq)) +#else: +# current_app.conf.update({ +# 'result_backend': result, +# }) +# diff --git a/apps/cic-eth/cic_eth/data/config/cic.ini b/apps/cic-eth/cic_eth/data/config/cic.ini index b4275352..103566ff 100644 --- a/apps/cic-eth/cic_eth/data/config/cic.ini +++ b/apps/cic-eth/cic_eth/data/config/cic.ini @@ -1,4 +1,6 @@ [cic] registry_address = trust_address = +default_token_symbol = health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas +run_dir = /run diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 7e22e911..4ba15e7c 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -8,8 +8,7 @@ import sys import re import datetime -# third-party imports -import confini +# external imports import celery from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec @@ -24,7 +23,7 @@ from chainqueue.error import NotLocalTxError from chainqueue.sql.state import set_reserved # local imports -import cic_eth +import cic_eth.cli from cic_eth.db import SessionBase from cic_eth.db.enum import LockEnum from cic_eth.db import dsn_from_config @@ -39,51 +38,30 @@ from cic_eth.error import ( logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() +arg_flags = cic_eth.cli.argflag_std_read +local_arg_flags = cic_eth.cli.argflag_local_sync +argparser = cic_eth.cli.ArgumentParser(arg_flags) +argparser.process_local_flags(local_arg_flags) +args = argparser.parse_args() -config_dir = os.path.join('/usr/local/etc/cic-eth') +config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-p', '--provider', default='http://localhost:8545', dest='p', type=str, help='rpc provider') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -args = argparser.parse_args(sys.argv[1:]) - -if args.v == True: - logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: - logging.getLogger().setLevel(logging.DEBUG) - -config_dir = os.path.join(args.c) -os.makedirs(config_dir, 0o777, True) -config = confini.Config(config_dir, args.env_prefix) -config.process() -# override args -args_override = { - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - 'ETH_PROVIDER': getattr(args, 'p'), - } -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) - -app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) - -queue = args.q +# connect to celery +celery_app = cic_eth.cli.CeleryApp.from_config(config) +# connect to database dsn = dsn_from_config(config) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) -RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') +# set up rpc +rpc = cic_eth.cli.RPC.from_config(config) +conn = rpc.get_default() run = True + class DispatchSyncer: yield_delay = 0.0005 diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 8974c33d..b69c64f7 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -6,7 +6,6 @@ import argparse import re # external imports -import confini import celery from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec @@ -14,6 +13,7 @@ from chainlib.connection import RPCConnection from chainsyncer.filter import SyncFilter # local imports +import cic_eth.cli from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase from cic_eth.admin.ctrl import lock_send @@ -25,66 +25,41 @@ from cic_eth.stat import init_chain_stat logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -config_dir = os.path.join('/usr/local/etc/cic-eth') - -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') +arg_flags = cic_eth.cli.argflag_std_read +local_arg_flags = cic_eth.cli.argflag_local_sync +argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') -argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -args = argparser.parse_args(sys.argv[1:]) +argparser.add_argument('--retry-delay', dest='retry_delay', type=int, default=20, help='seconds to wait for retrying a transaction that is marked as sent') +argparser.process_local_flags(local_arg_flags) +args = argparser.parse_args() - -if args.v == True: - logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: - logging.getLogger().setLevel(logging.DEBUG) - -config_dir = os.path.join(args.c) -os.makedirs(config_dir, 0o777, True) -config = confini.Config(config_dir, args.env_prefix) -config.process() -# override args -args_override = { - 'ETH_PROVIDER': getattr(args, 'p'), - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), +extra_args = { + 'retry_delay': 'RETRY_DELAY', + 'batch_size': 'RETRY_BATCH_SIZE', } -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) -config.add(args.batch_size, '_BATCH_SIZE', True) - -app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) - -queue = args.q - -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - -RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') +config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args) +# connect to database dsn = dsn_from_config(config) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +# set up rpc +rpc = cic_eth.cli.RPC.from_config(config) +conn = rpc.get_default() + def main(): - conn = RPCConnection.connect(chain_spec, 'default') - - straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) + straggler_delay = int(config.get('RETRY_DELAY')) loop_interval = config.get('SYNCER_LOOP_INTERVAL') if loop_interval == None: stat = init_chain_stat(conn) loop_interval = stat.block_average() - syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE')) + syncer = RetrySyncer(conn, chain_spec, cic_eth.cli.chain_interface, straggler_delay, batch_size=config.get('RETRY_BATCH_SIZE')) syncer.backend.set(0, 0) - fltr = StragglerFilter(chain_spec, queue=queue) + fltr = StragglerFilter(chain_spec, queue=config.get('CELERY_QUEUE')) syncer.add_filter(fltr) syncer.loop(int(loop_interval), conn) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index 837da5cb..22dc7d94 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -21,14 +21,17 @@ from chainlib.eth.connection import ( EthUnixSignerConnection, EthHTTPSignerConnection, ) +from chainlib.eth.address import to_checksum_address from chainlib.chain import ChainSpec from chainqueue.db.models.otx import Otx from cic_eth_registry.error import UnknownContractError from cic_eth_registry.erc20 import ERC20Token +from hexathon import add_0x import liveness.linux # local imports +import cic_eth.cli from cic_eth.eth import ( erc20, tx, @@ -70,114 +73,53 @@ from cic_eth.task import BaseTask logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -script_dir = os.path.dirname(os.path.realpath(__file__)) - -config_dir = os.path.join('/usr/local/etc/cic-eth') - -argparser = argparse.ArgumentParser() -argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') -argparser.add_argument('-c', type=str, default=config_dir, help='config file') -argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks') -argparser.add_argument('-r', type=str, help='CIC registry address') +arg_flags = cic_eth.cli.argflag_std_read +local_arg_flags = cic_eth.cli.argflag_local_sync +argparser = cic_eth.cli.ArgumentParser(arg_flags) +argparser.process_local_flags(local_arg_flags) argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use') argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage') -argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path') argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path') -argparser.add_argument('-v', action='store_true', help='be verbose') -argparser.add_argument('-vv', action='store_true', help='be more verbose') args = argparser.parse_args() -if args.vv: - logging.getLogger().setLevel(logging.DEBUG) -elif args.v: - logging.getLogger().setLevel(logging.INFO) - -config = confini.Config(args.c, args.env_prefix) -config.process() -# override args -args_override = { - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), - 'CIC_DEFAULT_TOKEN_SYMBOL': getattr(args, 'default_token_symbol'), - 'ETH_PROVIDER': getattr(args, 'p'), - 'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'), +# process config +extra_args = { + 'default_token_symbol': 'CIC_DEFAULT_TOKEN_SYMBOL', + 'aux_all': None, + 'aux': None, + 'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS', } -config.add(args.q, '_CELERY_QUEUE', True) -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(args.c, config)) +config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) -health_modules = config.get('CIC_HEALTH_MODULES', []) -if len(health_modules) != 0: - health_modules = health_modules.split(',') -logg.debug('health mods {}'.format(health_modules)) +# connect to celery +celery_app = cic_eth.cli.CeleryApp.from_config(config) +# set up rpc +rpc = cic_eth.cli.RPC.from_config(config, use_signer=True) +conn = rpc.get_default() # connect to database dsn = dsn_from_config(config) SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) - - -# set up celery -current_app = celery.Celery(__name__) - -broker = config.get('CELERY_BROKER_URL') -if broker[:4] == 'file': - bq = tempfile.mkdtemp() - bp = tempfile.mkdtemp() - conf_update = { - 'broker_url': broker, - 'broker_transport_options': { - 'data_folder_in': bq, - 'data_folder_out': bq, - 'data_folder_processed': bp, - }, - } - if config.true('CELERY_DEBUG'): - conf_update['result_extended'] = True - current_app.conf.update(conf_update) - logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp)) -else: - conf_update = { - 'broker_url': broker, - } - if config.true('CELERY_DEBUG'): - conf_update['result_extended'] = True - current_app.conf.update(conf_update) - -result = config.get('CELERY_RESULT_URL') -if result[:4] == 'file': - rq = tempfile.mkdtemp() - current_app.conf.update({ - 'result_backend': 'file://{}'.format(rq), - }) - logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq)) -else: - current_app.conf.update({ - 'result_backend': result, - }) - -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') -RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') -RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') -RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') -RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') - Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') -#import cic_eth.checks.gas -#if not cic_eth.checks.gas.health(config=config): -# raise RuntimeError() + +# execute health checks +# TODO: health should be separate service with endpoint that can be queried +health_modules = config.get('CIC_HEALTH_MODULES', []) +if len(health_modules) != 0: + health_modules = health_modules.split(',') +logg.debug('health mods {}'.format(health_modules)) liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker') -rpc = RPCConnection.connect(chain_spec, 'default') + +# set up chain provisions +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) +registry = None try: - registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) + registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) except UnknownContractError as e: logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e)) sys.exit(1) @@ -188,15 +130,15 @@ if trusted_addresses_src == None: logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') sys.exit(1) trusted_addresses = trusted_addresses_src.split(',') -for address in trusted_addresses: +for i, address in enumerate(trusted_addresses): + if config.get('_UNSAFE'): + trusted_addresses[i] = to_checksum_address(address) logg.info('using trusted address {}'.format(address)) +connect_declarator(conn, chain_spec, trusted_addresses) +connect_token_registry(conn, chain_spec) -connect_declarator(rpc, chain_spec, trusted_addresses) -connect_token_registry(rpc, chain_spec) - -# detect aux +# detect auxiliary task modules (plugins) # TODO: move to separate file -#aux_dir = os.path.join(script_dir, '..', '..', 'aux') aux = [] if args.aux_all: if len(args.aux) > 0: @@ -249,36 +191,24 @@ elif len(args.aux) > 0: for v in aux: mname = 'cic_eth_aux.' + v mod = importlib.import_module(mname) - mod.aux_setup(rpc, config) + mod.aux_setup(conn, config) logg.info('loaded aux module {}'.format(mname)) def main(): argv = ['worker'] - if args.vv: - argv.append('--loglevel=DEBUG') - elif args.v: - argv.append('--loglevel=INFO') + log_level = logg.getEffectiveLevel() + log_level_name = logging.getLevelName(log_level) + argv.append('--loglevel=' + log_level_name) argv.append('-Q') - argv.append(args.q) + argv.append(config.get('CELERY_QUEUE')) argv.append('-n') - argv.append(args.q) - -# if config.true('SSL_ENABLE_CLIENT'): -# Callback.ssl = True -# Callback.ssl_cert_file = config.get('SSL_CERT_FILE') -# Callback.ssl_key_file = config.get('SSL_KEY_FILE') -# Callback.ssl_password = config.get('SSL_PASSWORD') -# -# if config.get('SSL_CA_FILE') != '': -# Callback.ssl_ca_file = config.get('SSL_CA_FILE') - - rpc = RPCConnection.connect(chain_spec, 'default') + argv.append(config.get('CELERY_QUEUE')) BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) - default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address) - default_token.load(rpc) + default_token = ERC20Token(chain_spec, conn, BaseTask.default_token_address) + default_token.load(conn) BaseTask.default_token_decimals = default_token.decimals BaseTask.default_token_name = default_token.name @@ -286,13 +216,13 @@ def main(): logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) liveness.linux.set(rundir=config.get('CIC_RUN_DIR')) - current_app.worker_main(argv) + celery_app.worker_main(argv) liveness.linux.reset(rundir=config.get('CIC_RUN_DIR')) @celery.signals.eventlet_pool_postshutdown.connect def shutdown(sender=None, headers=None, body=None, **kwargs): - logg.warning('in shudown event hook') + logg.warning('in shutdown event hook') if __name__ == '__main__': diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py index 6d138dde..5117537c 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py @@ -46,8 +46,6 @@ from cic_eth.registry import ( logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -logging.getLogger('confini').setLevel(logging.WARNING) -logging.getLogger('gnupg').setLevel(logging.WARNING) arg_flags = cic_eth.cli.argflag_std_read local_arg_flags = cic_eth.cli.argflag_local_sync diff --git a/apps/cic-eth/cic_eth/sync/retry.py b/apps/cic-eth/cic_eth/sync/retry.py index b311964d..9118ebf4 100644 --- a/apps/cic-eth/cic_eth/sync/retry.py +++ b/apps/cic-eth/cic_eth/sync/retry.py @@ -3,7 +3,7 @@ import logging import datetime # external imports -from chainsyncer.driver import HeadSyncer +from chainsyncer.driver.head import HeadSyncer from chainsyncer.backend.memory import MemBackend from chainsyncer.error import NoBlockForYou from chainlib.eth.block import ( @@ -39,9 +39,9 @@ class DbSessionMemBackend(MemBackend): class RetrySyncer(HeadSyncer): - def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): + def __init__(self, conn, chain_spec, chain_interface, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): backend = DbSessionMemBackend(chain_spec, None) - super(RetrySyncer, self).__init__(backend) + super(RetrySyncer, self).__init__(backend, chain_interface) self.chain_spec = chain_spec if failed_grace_seconds == None: failed_grace_seconds = stalled_grace_seconds