Implement chainlib cli util for tasker, retry, dispatcher
This commit is contained in:
		
							parent
							
								
									0c5c6146d6
								
							
						
					
					
						commit
						1bc6c5ace5
					
				| @ -21,7 +21,7 @@ def health(*args, **kwargs): | ||||
|     session = SessionBase.create_session() | ||||
| 
 | ||||
|     config = kwargs['config'] | ||||
|     chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) | ||||
|     chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) | ||||
|     logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec)) | ||||
| 
 | ||||
|     try: | ||||
|  | ||||
| @ -15,7 +15,7 @@ logg = logging.getLogger().getChild(__name__) | ||||
| def health(*args, **kwargs): | ||||
|     blocked = True | ||||
|     max_attempts = 5 | ||||
|     conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer') | ||||
|     conn = RPCConnection.connect(kwargs['config'].get('CHAIN_SPEC'), tag='signer') | ||||
|     for i in range(max_attempts): | ||||
|         idx = i + 1 | ||||
|         logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts)) | ||||
|  | ||||
| @ -6,7 +6,10 @@ from chainlib.connection import ( | ||||
|         RPCConnection, | ||||
|         ConnType, | ||||
|         ) | ||||
| from chainlib.eth.connection import EthUnixSignerConnection | ||||
| from chainlib.eth.connection import ( | ||||
|         EthUnixSignerConnection, | ||||
|         EthHTTPSignerConnection, | ||||
|         ) | ||||
| from chainlib.chain import ChainSpec | ||||
| 
 | ||||
| logg = logging.getLogger(__name__) | ||||
| @ -25,11 +28,15 @@ class RPC: | ||||
| 
 | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def from_config(config): | ||||
|     def from_config(config, use_signer=False): | ||||
|         chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) | ||||
| 
 | ||||
|         RPCConnection.register_location(config.get('RPC_HTTP_PROVIDER'), chain_spec, 'default') | ||||
|         if config.get('SIGNER_PROVIDER'): | ||||
|             RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer') | ||||
|         if use_signer: | ||||
| 
 | ||||
|             RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') | ||||
|             RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') | ||||
|             RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') | ||||
|             RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer') | ||||
|         rpc = RPC(chain_spec, config.get('RPC_HTTP_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER')) | ||||
|         logg.info('set up rpc: {}'.format(rpc)) | ||||
| @ -40,4 +47,40 @@ class RPC: | ||||
|         return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TOOD: re-implement file backend option from omittec code: | ||||
| #broker = config.get('CELERY_BROKER_URL') | ||||
| #if broker[:4] == 'file': | ||||
| #    bq = tempfile.mkdtemp() | ||||
| #    bp = tempfile.mkdtemp() | ||||
| #    conf_update = { | ||||
| #            'broker_url': broker, | ||||
| #            'broker_transport_options': { | ||||
| #                'data_folder_in': bq, | ||||
| #                'data_folder_out': bq, | ||||
| #                'data_folder_processed': bp, | ||||
| #            }, | ||||
| #            } | ||||
| #    if config.true('CELERY_DEBUG'): | ||||
| #        conf_update['result_extended'] = True | ||||
| #    current_app.conf.update(conf_update) | ||||
| #    logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp)) | ||||
| #else: | ||||
| #    conf_update = { | ||||
| #            'broker_url': broker, | ||||
| #            } | ||||
| #    if config.true('CELERY_DEBUG'): | ||||
| #        conf_update['result_extended'] = True | ||||
| #    current_app.conf.update(conf_update) | ||||
| # | ||||
| #result = config.get('CELERY_RESULT_URL') | ||||
| #if result[:4] == 'file': | ||||
| #    rq = tempfile.mkdtemp() | ||||
| #    current_app.conf.update({ | ||||
| #        'result_backend': 'file://{}'.format(rq), | ||||
| #        }) | ||||
| #    logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq)) | ||||
| #else: | ||||
| #    current_app.conf.update({ | ||||
| #        'result_backend': result, | ||||
| #        }) | ||||
| # | ||||
|  | ||||
| @ -1,4 +1,6 @@ | ||||
| [cic] | ||||
| registry_address =  | ||||
| trust_address = | ||||
| default_token_symbol = | ||||
| health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas | ||||
| run_dir = /run | ||||
|  | ||||
| @ -8,8 +8,7 @@ import sys | ||||
| import re | ||||
| import datetime | ||||
| 
 | ||||
| # third-party imports | ||||
| import confini | ||||
| # external imports | ||||
| import celery | ||||
| from cic_eth_registry import CICRegistry | ||||
| from chainlib.chain import ChainSpec | ||||
| @ -24,7 +23,7 @@ from chainqueue.error import NotLocalTxError | ||||
| from chainqueue.sql.state import set_reserved | ||||
| 
 | ||||
| # local imports | ||||
| import cic_eth | ||||
| import cic_eth.cli | ||||
| from cic_eth.db import SessionBase | ||||
| from cic_eth.db.enum import LockEnum | ||||
| from cic_eth.db import dsn_from_config | ||||
| @ -39,51 +38,30 @@ from cic_eth.error import ( | ||||
| logging.basicConfig(level=logging.WARNING) | ||||
| logg = logging.getLogger() | ||||
| 
 | ||||
| arg_flags = cic_eth.cli.argflag_std_read | ||||
| local_arg_flags = cic_eth.cli.argflag_local_sync | ||||
| argparser = cic_eth.cli.ArgumentParser(arg_flags) | ||||
| argparser.process_local_flags(local_arg_flags) | ||||
| args = argparser.parse_args() | ||||
| 
 | ||||
| config_dir = os.path.join('/usr/local/etc/cic-eth') | ||||
| config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) | ||||
| 
 | ||||
| argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') | ||||
| argparser.add_argument('-p', '--provider', default='http://localhost:8545', dest='p', type=str, help='rpc provider') | ||||
| argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') | ||||
| argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') | ||||
| argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') | ||||
| argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') | ||||
| argparser.add_argument('-v', help='be verbose', action='store_true') | ||||
| argparser.add_argument('-vv', help='be more verbose', action='store_true') | ||||
| args = argparser.parse_args(sys.argv[1:]) | ||||
| 
 | ||||
| if args.v == True: | ||||
|     logging.getLogger().setLevel(logging.INFO) | ||||
| elif args.vv == True: | ||||
|     logging.getLogger().setLevel(logging.DEBUG) | ||||
| 
 | ||||
| config_dir = os.path.join(args.c) | ||||
| os.makedirs(config_dir, 0o777, True) | ||||
| config = confini.Config(config_dir, args.env_prefix) | ||||
| config.process() | ||||
| # override args | ||||
| args_override = { | ||||
|         'CIC_CHAIN_SPEC': getattr(args, 'i'), | ||||
|         'ETH_PROVIDER': getattr(args, 'p'), | ||||
|         } | ||||
| config.dict_override(args_override, 'cli flag') | ||||
| config.censor('PASSWORD', 'DATABASE') | ||||
| config.censor('PASSWORD', 'SSL') | ||||
| logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) | ||||
| 
 | ||||
| app = celery.Celery(backend=config.get('CELERY_RESULT_URL'),  broker=config.get('CELERY_BROKER_URL')) | ||||
| 
 | ||||
| queue = args.q | ||||
| # connect to celery | ||||
| celery_app = cic_eth.cli.CeleryApp.from_config(config) | ||||
| 
 | ||||
| # connect to database | ||||
| dsn = dsn_from_config(config) | ||||
| SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) | ||||
| 
 | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) | ||||
| 
 | ||||
| RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') | ||||
| # set up rpc | ||||
| rpc = cic_eth.cli.RPC.from_config(config) | ||||
| conn = rpc.get_default() | ||||
| 
 | ||||
| run = True | ||||
| 
 | ||||
| 
 | ||||
| class DispatchSyncer: | ||||
| 
 | ||||
|     yield_delay = 0.0005 | ||||
|  | ||||
| @ -6,7 +6,6 @@ import argparse | ||||
| import re | ||||
| 
 | ||||
| # external imports | ||||
| import confini | ||||
| import celery | ||||
| from cic_eth_registry import CICRegistry | ||||
| from chainlib.chain import ChainSpec | ||||
| @ -14,6 +13,7 @@ from chainlib.connection import RPCConnection | ||||
| from chainsyncer.filter import SyncFilter | ||||
| 
 | ||||
| # local imports | ||||
| import cic_eth.cli | ||||
| from cic_eth.db import dsn_from_config | ||||
| from cic_eth.db import SessionBase | ||||
| from cic_eth.admin.ctrl import lock_send | ||||
| @ -25,66 +25,41 @@ from cic_eth.stat import init_chain_stat | ||||
| logging.basicConfig(level=logging.WARNING) | ||||
| logg = logging.getLogger() | ||||
| 
 | ||||
| config_dir = os.path.join('/usr/local/etc/cic-eth') | ||||
| 
 | ||||
| argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') | ||||
| argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') | ||||
| argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') | ||||
| argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') | ||||
| arg_flags = cic_eth.cli.argflag_std_read | ||||
| local_arg_flags = cic_eth.cli.argflag_local_sync | ||||
| argparser = cic_eth.cli.ArgumentParser(arg_flags) | ||||
| argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') | ||||
| argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') | ||||
| argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') | ||||
| argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') | ||||
| argparser.add_argument('-v', help='be verbose', action='store_true') | ||||
| argparser.add_argument('-vv', help='be more verbose', action='store_true') | ||||
| args = argparser.parse_args(sys.argv[1:]) | ||||
| argparser.add_argument('--retry-delay', dest='retry_delay', type=int, default=20, help='seconds to wait for retrying a transaction that is marked as sent') | ||||
| argparser.process_local_flags(local_arg_flags) | ||||
| args = argparser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| if args.v == True: | ||||
|     logging.getLogger().setLevel(logging.INFO) | ||||
| elif args.vv == True: | ||||
|     logging.getLogger().setLevel(logging.DEBUG) | ||||
| 
 | ||||
| config_dir = os.path.join(args.c) | ||||
| os.makedirs(config_dir, 0o777, True) | ||||
| config = confini.Config(config_dir, args.env_prefix) | ||||
| config.process() | ||||
| # override args | ||||
| args_override = { | ||||
|         'ETH_PROVIDER': getattr(args, 'p'), | ||||
|         'CIC_CHAIN_SPEC': getattr(args, 'i'), | ||||
|         'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), | ||||
| extra_args = { | ||||
|     'retry_delay': 'RETRY_DELAY', | ||||
|     'batch_size': 'RETRY_BATCH_SIZE', | ||||
|         } | ||||
| config.dict_override(args_override, 'cli flag') | ||||
| config.censor('PASSWORD', 'DATABASE') | ||||
| config.censor('PASSWORD', 'SSL') | ||||
| logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) | ||||
| config.add(args.batch_size, '_BATCH_SIZE', True) | ||||
| 
 | ||||
| app = celery.Celery(backend=config.get('CELERY_RESULT_URL'),  broker=config.get('CELERY_BROKER_URL')) | ||||
| 
 | ||||
| queue = args.q | ||||
| 
 | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) | ||||
| 
 | ||||
| RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') | ||||
| config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args) | ||||
| 
 | ||||
| # connect to database | ||||
| dsn = dsn_from_config(config) | ||||
| SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) | ||||
| 
 | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) | ||||
| 
 | ||||
| # set up rpc | ||||
| rpc = cic_eth.cli.RPC.from_config(config) | ||||
| conn = rpc.get_default() | ||||
| 
 | ||||
| 
 | ||||
| def main():  | ||||
|     conn = RPCConnection.connect(chain_spec, 'default') | ||||
| 
 | ||||
|     straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) | ||||
|     straggler_delay = int(config.get('RETRY_DELAY')) | ||||
|     loop_interval = config.get('SYNCER_LOOP_INTERVAL') | ||||
|     if loop_interval == None: | ||||
|         stat = init_chain_stat(conn) | ||||
|         loop_interval = stat.block_average() | ||||
| 
 | ||||
|     syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE')) | ||||
|     syncer = RetrySyncer(conn, chain_spec, cic_eth.cli.chain_interface, straggler_delay, batch_size=config.get('RETRY_BATCH_SIZE')) | ||||
|     syncer.backend.set(0, 0) | ||||
|     fltr = StragglerFilter(chain_spec, queue=queue) | ||||
|     fltr = StragglerFilter(chain_spec, queue=config.get('CELERY_QUEUE')) | ||||
|     syncer.add_filter(fltr) | ||||
|     syncer.loop(int(loop_interval), conn) | ||||
| 
 | ||||
|  | ||||
| @ -21,14 +21,17 @@ from chainlib.eth.connection import ( | ||||
|         EthUnixSignerConnection, | ||||
|         EthHTTPSignerConnection, | ||||
|         ) | ||||
| from chainlib.eth.address import to_checksum_address | ||||
| from chainlib.chain import ChainSpec | ||||
| from chainqueue.db.models.otx import Otx | ||||
| from cic_eth_registry.error import UnknownContractError | ||||
| from cic_eth_registry.erc20 import ERC20Token | ||||
| from hexathon import add_0x | ||||
| import liveness.linux | ||||
| 
 | ||||
| 
 | ||||
| # local imports | ||||
| import cic_eth.cli | ||||
| from cic_eth.eth import ( | ||||
|         erc20, | ||||
|         tx, | ||||
| @ -70,114 +73,53 @@ from cic_eth.task import BaseTask | ||||
| logging.basicConfig(level=logging.WARNING) | ||||
| logg = logging.getLogger() | ||||
| 
 | ||||
| script_dir = os.path.dirname(os.path.realpath(__file__)) | ||||
| 
 | ||||
| config_dir = os.path.join('/usr/local/etc/cic-eth') | ||||
| 
 | ||||
| argparser = argparse.ArgumentParser() | ||||
| argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') | ||||
| argparser.add_argument('-c', type=str, default=config_dir, help='config file') | ||||
| argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks') | ||||
| argparser.add_argument('-r', type=str, help='CIC registry address') | ||||
| arg_flags = cic_eth.cli.argflag_std_read | ||||
| local_arg_flags = cic_eth.cli.argflag_local_sync | ||||
| argparser = cic_eth.cli.ArgumentParser(arg_flags) | ||||
| argparser.process_local_flags(local_arg_flags) | ||||
| argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use') | ||||
| argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage') | ||||
| argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') | ||||
| argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') | ||||
| argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path') | ||||
| argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path') | ||||
| argparser.add_argument('-v', action='store_true', help='be verbose') | ||||
| argparser.add_argument('-vv', action='store_true', help='be more verbose') | ||||
| args = argparser.parse_args() | ||||
| 
 | ||||
| if args.vv: | ||||
|     logging.getLogger().setLevel(logging.DEBUG) | ||||
| elif args.v: | ||||
|     logging.getLogger().setLevel(logging.INFO) | ||||
| 
 | ||||
| config = confini.Config(args.c, args.env_prefix) | ||||
| config.process() | ||||
| # override args | ||||
| args_override = { | ||||
|         'CIC_CHAIN_SPEC': getattr(args, 'i'), | ||||
|         'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), | ||||
|         'CIC_DEFAULT_TOKEN_SYMBOL': getattr(args, 'default_token_symbol'), | ||||
|         'ETH_PROVIDER': getattr(args, 'p'), | ||||
|         'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'), | ||||
| # process config | ||||
| extra_args = { | ||||
|     'default_token_symbol': 'CIC_DEFAULT_TOKEN_SYMBOL', | ||||
|     'aux_all': None, | ||||
|     'aux': None, | ||||
|     'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS', | ||||
|         } | ||||
| config.add(args.q, '_CELERY_QUEUE', True) | ||||
| config.dict_override(args_override, 'cli flag') | ||||
| config.censor('PASSWORD', 'DATABASE') | ||||
| config.censor('PASSWORD', 'SSL') | ||||
| logg.debug('config loaded from {}:\n{}'.format(args.c, config)) | ||||
| config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) | ||||
| 
 | ||||
| health_modules = config.get('CIC_HEALTH_MODULES', []) | ||||
| if len(health_modules) != 0: | ||||
|     health_modules = health_modules.split(',') | ||||
| logg.debug('health mods {}'.format(health_modules)) | ||||
| # connect to celery | ||||
| celery_app = cic_eth.cli.CeleryApp.from_config(config) | ||||
| 
 | ||||
| # set up rpc | ||||
| rpc = cic_eth.cli.RPC.from_config(config, use_signer=True) | ||||
| conn = rpc.get_default() | ||||
| 
 | ||||
| 
 | ||||
| # connect to database | ||||
| dsn = dsn_from_config(config) | ||||
| SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) | ||||
| 
 | ||||
| 
 | ||||
| # set up celery | ||||
| current_app = celery.Celery(__name__) | ||||
| 
 | ||||
| broker = config.get('CELERY_BROKER_URL') | ||||
| if broker[:4] == 'file': | ||||
|     bq = tempfile.mkdtemp() | ||||
|     bp = tempfile.mkdtemp() | ||||
|     conf_update = { | ||||
|             'broker_url': broker, | ||||
|             'broker_transport_options': { | ||||
|                 'data_folder_in': bq, | ||||
|                 'data_folder_out': bq, | ||||
|                 'data_folder_processed': bp, | ||||
|             }, | ||||
|             } | ||||
|     if config.true('CELERY_DEBUG'): | ||||
|         conf_update['result_extended'] = True | ||||
|     current_app.conf.update(conf_update) | ||||
|     logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp)) | ||||
| else: | ||||
|     conf_update = { | ||||
|             'broker_url': broker, | ||||
|             } | ||||
|     if config.true('CELERY_DEBUG'): | ||||
|         conf_update['result_extended'] = True | ||||
|     current_app.conf.update(conf_update) | ||||
| 
 | ||||
| result = config.get('CELERY_RESULT_URL') | ||||
| if result[:4] == 'file': | ||||
|     rq = tempfile.mkdtemp() | ||||
|     current_app.conf.update({ | ||||
|         'result_backend': 'file://{}'.format(rq), | ||||
|         }) | ||||
|     logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq)) | ||||
| else: | ||||
|     current_app.conf.update({ | ||||
|         'result_backend': result, | ||||
|         }) | ||||
| 
 | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) | ||||
| RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') | ||||
| RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') | ||||
| RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') | ||||
| RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') | ||||
| RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') | ||||
| 
 | ||||
| Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') | ||||
| 
 | ||||
| #import cic_eth.checks.gas | ||||
| #if not cic_eth.checks.gas.health(config=config): | ||||
| #    raise RuntimeError() | ||||
| 
 | ||||
| # execute health checks | ||||
| # TODO: health should be separate service with endpoint that can be queried | ||||
| health_modules = config.get('CIC_HEALTH_MODULES', []) | ||||
| if len(health_modules) != 0: | ||||
|     health_modules = health_modules.split(',') | ||||
| logg.debug('health mods {}'.format(health_modules)) | ||||
| liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker') | ||||
| 
 | ||||
| rpc = RPCConnection.connect(chain_spec, 'default') | ||||
| 
 | ||||
| # set up chain provisions | ||||
| chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) | ||||
| registry = None | ||||
| try: | ||||
|     registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) | ||||
|     registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) | ||||
| except UnknownContractError as e: | ||||
|     logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e)) | ||||
|     sys.exit(1) | ||||
| @ -188,15 +130,15 @@ if trusted_addresses_src == None: | ||||
|     logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') | ||||
|     sys.exit(1) | ||||
| trusted_addresses = trusted_addresses_src.split(',') | ||||
| for address in trusted_addresses: | ||||
| for i, address in enumerate(trusted_addresses): | ||||
|     if config.get('_UNSAFE'): | ||||
|         trusted_addresses[i] = to_checksum_address(address) | ||||
|     logg.info('using trusted address {}'.format(address)) | ||||
| connect_declarator(conn, chain_spec, trusted_addresses) | ||||
| connect_token_registry(conn, chain_spec) | ||||
| 
 | ||||
| connect_declarator(rpc, chain_spec, trusted_addresses) | ||||
| connect_token_registry(rpc, chain_spec) | ||||
| 
 | ||||
| # detect aux  | ||||
| # detect auxiliary task modules (plugins) | ||||
| # TODO: move to separate file | ||||
| #aux_dir = os.path.join(script_dir, '..', '..', 'aux') | ||||
| aux = [] | ||||
| if args.aux_all: | ||||
|     if len(args.aux) > 0: | ||||
| @ -249,36 +191,24 @@ elif len(args.aux) > 0: | ||||
| for v in aux: | ||||
|     mname = 'cic_eth_aux.' + v | ||||
|     mod = importlib.import_module(mname) | ||||
|     mod.aux_setup(rpc, config) | ||||
|     mod.aux_setup(conn, config) | ||||
|     logg.info('loaded aux module {}'.format(mname)) | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     argv = ['worker'] | ||||
|     if args.vv: | ||||
|         argv.append('--loglevel=DEBUG') | ||||
|     elif args.v: | ||||
|         argv.append('--loglevel=INFO') | ||||
|     log_level = logg.getEffectiveLevel() | ||||
|     log_level_name = logging.getLevelName(log_level) | ||||
|     argv.append('--loglevel=' + log_level_name) | ||||
|     argv.append('-Q') | ||||
|     argv.append(args.q) | ||||
|     argv.append(config.get('CELERY_QUEUE')) | ||||
|     argv.append('-n') | ||||
|     argv.append(args.q) | ||||
| 
 | ||||
| #    if config.true('SSL_ENABLE_CLIENT'): | ||||
| #        Callback.ssl = True | ||||
| #        Callback.ssl_cert_file = config.get('SSL_CERT_FILE') | ||||
| #        Callback.ssl_key_file = config.get('SSL_KEY_FILE') | ||||
| #        Callback.ssl_password = config.get('SSL_PASSWORD') | ||||
| # | ||||
| #    if config.get('SSL_CA_FILE') != '': | ||||
| #        Callback.ssl_ca_file = config.get('SSL_CA_FILE') | ||||
| 
 | ||||
|     rpc = RPCConnection.connect(chain_spec, 'default') | ||||
|     argv.append(config.get('CELERY_QUEUE')) | ||||
| 
 | ||||
|     BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') | ||||
|     BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) | ||||
|     default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address) | ||||
|     default_token.load(rpc) | ||||
|     default_token = ERC20Token(chain_spec, conn, BaseTask.default_token_address) | ||||
|     default_token.load(conn) | ||||
|     BaseTask.default_token_decimals = default_token.decimals | ||||
|     BaseTask.default_token_name = default_token.name | ||||
| 
 | ||||
| @ -286,13 +216,13 @@ def main(): | ||||
|     logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) | ||||
|     | ||||
|     liveness.linux.set(rundir=config.get('CIC_RUN_DIR')) | ||||
|     current_app.worker_main(argv) | ||||
|     celery_app.worker_main(argv) | ||||
|     liveness.linux.reset(rundir=config.get('CIC_RUN_DIR')) | ||||
| 
 | ||||
| 
 | ||||
| @celery.signals.eventlet_pool_postshutdown.connect | ||||
| def shutdown(sender=None, headers=None, body=None, **kwargs): | ||||
|     logg.warning('in shudown event hook') | ||||
|     logg.warning('in shutdown event hook') | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  | ||||
| @ -46,8 +46,6 @@ from cic_eth.registry import ( | ||||
| 
 | ||||
| logging.basicConfig(level=logging.WARNING) | ||||
| logg = logging.getLogger() | ||||
| logging.getLogger('confini').setLevel(logging.WARNING) | ||||
| logging.getLogger('gnupg').setLevel(logging.WARNING) | ||||
| 
 | ||||
| arg_flags = cic_eth.cli.argflag_std_read | ||||
| local_arg_flags = cic_eth.cli.argflag_local_sync | ||||
|  | ||||
| @ -3,7 +3,7 @@ import logging | ||||
| import datetime | ||||
| 
 | ||||
| # external imports | ||||
| from chainsyncer.driver import HeadSyncer | ||||
| from chainsyncer.driver.head import HeadSyncer | ||||
| from chainsyncer.backend.memory import MemBackend | ||||
| from chainsyncer.error import NoBlockForYou | ||||
| from chainlib.eth.block import ( | ||||
| @ -39,9 +39,9 @@ class DbSessionMemBackend(MemBackend): | ||||
| 
 | ||||
| class RetrySyncer(HeadSyncer): | ||||
| 
 | ||||
|     def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): | ||||
|     def __init__(self, conn, chain_spec, chain_interface, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): | ||||
|         backend = DbSessionMemBackend(chain_spec, None) | ||||
|         super(RetrySyncer, self).__init__(backend) | ||||
|         super(RetrySyncer, self).__init__(backend, chain_interface) | ||||
|         self.chain_spec = chain_spec | ||||
|         if failed_grace_seconds == None: | ||||
|             failed_grace_seconds = stalled_grace_seconds | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user