Add celery cli args with defaults from redis

This commit is contained in:
nolash 2021-10-31 07:58:35 +01:00
parent 7a366edb9d
commit d88ae00b72
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
9 changed files with 83 additions and 73 deletions

View File

@ -16,16 +16,22 @@ class ArgumentParser(BaseArgumentParser):
self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
self.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
self.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
self.add_argument('--redis-host-callback', dest='redis_host_callback', type=str, help='redis host to use for callback (defaults to redis host)')
self.add_argument('--redis-port-callback', dest='redis_port_callback', type=int, help='redis port to use for callback (defaults to redis port)')
self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout')
if local_arg_flags & CICFlag.CELERY:
self.add_argument('--celery-scheme', type=str, help='Celery broker scheme (defaults to "redis")')
self.add_argument('--celery-host', type=str, help='Celery broker host (defaults to redis host)')
self.add_argument('--celery-port', type=str, help='Celery broker port (defaults to redis port)')
self.add_argument('--celery-db', type=int, help='Celery broker db (defaults to redis db)')
self.add_argument('--celery-result-scheme', type=str, help='Celery result backend scheme (defaults to celery broker scheme)')
self.add_argument('--celery-result-host', type=str, help='Celery result backend host (defaults to celery broker host)')
self.add_argument('--celery-result-port', type=str, help='Celery result backend port (defaults to celery broker port)')
self.add_argument('--celery-result-db', type=int, help='Celery result backend db (defaults to celery broker db)')
self.add_argument('--celery-no-result', action='store_true', help='Disable the Celery results backend')
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@ -24,8 +24,8 @@ class CICFlag(enum.IntEnum):
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
argflag_local_base = argflag_std_base | Flag.CHAIN_SPEC
argflag_local_task = CICFlag.CELERY
argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@ -1,12 +1,18 @@
# standard imports
import os
import logging
import urllib.parse
import copy
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
from urlybird.merge import (
urlhostmerge,
urlmerge,
)
# local imports
from .base import CICFlag
@ -40,6 +46,7 @@ class Config(BaseConfig):
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
@ -49,15 +56,61 @@ class Config(BaseConfig):
config.dict_override(local_args_override, 'local cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
local_celery_args_override = {}
if local_arg_flags & CICFlag.CELERY:
hostport = urlhostmerge(
None,
config.get('REDIS_HOST'),
config.get('REDIS_PORT'),
)
redis_url = (
'redis',
hostport,
getattr(args, 'redis_db', None),
)
celery_config_url = urllib.parse.urlsplit(config.get('CELERY_BROKER_URL'))
hostport = urlhostmerge(
celery_config_url[1],
getattr(args, 'celery_host', None),
getattr(args, 'celery_port', None),
)
celery_arg_url = (
getattr(args, 'celery_scheme', None),
hostport,
getattr(args, 'celery_db', None),
)
celery_url = urlmerge(redis_url, celery_config_url, celery_arg_url)
celery_url_string = urllib.parse.urlunsplit(celery_url)
local_celery_args_override['CELERY_BROKER_URL'] = celery_url_string
if not getattr(args, 'celery_no_result'):
local_celery_args_override['CELERY_RESULT_URL'] = config.get('CELERY_RESULT_URL')
if local_celery_args_override['CELERY_RESULT_URL'] == None:
local_celery_args_override['CELERY_RESULT_URL'] = local_celery_args_override['CELERY_BROKER_URL']
celery_config_url = urllib.parse.urlsplit(local_celery_args_override['CELERY_RESULT_URL'])
hostport = urlhostmerge(
celery_config_url[1],
getattr(args, 'celery_result_host', None),
getattr(args, 'celery_result_port', None),
)
celery_arg_url = (
getattr(args, 'celery_result_scheme', None),
hostport,
getattr(args, 'celery_result_db', None),
)
celery_url = urlmerge(celery_config_url, celery_arg_url)
logg.debug('celery url {} {}'.format(celery_config_url, celery_url))
celery_url_string = urllib.parse.urlunsplit(celery_url)
local_celery_args_override['CELERY_RESULT_URL'] = celery_url_string
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
config.dict_override(local_celery_args_override, 'local celery cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
redis_host_callback = getattr(args, 'redis_host_callback', config.get('REDIS_HOST'))
redis_port_callback = getattr(args, 'redis_port_callback', config.get('REDIS_PORT'))
config.add(redis_host_callback, '_REDIS_HOST_CALLBACK')
config.add(redis_port_callback, '_REDIS_PORT_CALLBACK')
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@ -1,5 +1,5 @@
[celery]
broker_url = redis://localhost:6379
broker_url =
result_url =
queue = cic-eth
debug = 0

View File

@ -1,3 +1,6 @@
# standard imports
import logging
# external imports
from chainlib.eth.gas import RPCGasOracle
from hexathon import strip_0x
@ -7,9 +10,11 @@ from cic_eth.db.models.gas_cache import GasCache
from cic_eth.encode import tx_normalize
from cic_eth.db.models.base import SessionBase
MAXIMUM_FEE_UNITS = 8000000
logg = logging.getLogger(__name__)
class MaxGasOracle(RPCGasOracle):
def get_fee_units(self, code=None):

View File

@ -18,7 +18,7 @@ from cic_eth.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger('create_account_script')
arg_flags = cic_eth.cli.argflag_std_base
arg_flags = cic_eth.cli.argflag_local_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--token-symbol', dest='token_symbol', type=str, help='Token symbol')

View File

@ -46,27 +46,6 @@ argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
#default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
#argparser = argparse.ArgumentParser()
#argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
#argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
#argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format')
#argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output status bit enum names only')
#argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
#argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
#argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
#argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
#argparser.add_argument('-v', action='store_true', help='Be verbose')
#argparser.add_argument('-vv', help='be more verbose', action='store_true')
#argparser.add_argument('query', type=str, help='Transaction, transaction hash, account or "lock"')
#args = argparser.parse_args()
#if args.v == True:
# logging.getLogger().setLevel(logging.INFO)
#elif args.vv == True:
# logging.getLogger().setLevel(logging.DEBUG)
#
extra_args = {
'f': '_FORMAT',
'query': '_QUERY',
@ -74,38 +53,10 @@ extra_args = {
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
celery_app = cic_eth.cli.CeleryApp.from_config(config)
#config_dir = os.path.join(args.c)
#os.makedirs(config_dir, 0o777, True)
#config = confini.Config(config_dir, args.env_prefix)
#config.process()
#args_override = {
# 'ETH_PROVIDER': getattr(args, 'p'),
# 'CIC_CHAIN_SPEC': getattr(args, 'i'),
# 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
# }
## override args
#config.dict_override(args_override, 'cli args')
#config.censor('PASSWORD', 'DATABASE')
#config.censor('PASSWORD', 'SSL')
#logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
#
#try:
# config.add(add_0x(args.query), '_QUERY', True)
#except:
# config.add(args.query, '_QUERY', True)
#celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
queue = config.get('CELERY_QUEUE')
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
#rpc = EthHTTPConnection(args.p)
#registry_address = config.get('CIC_REGISTRY_ADDRESS')
# connect to celery
celery_app = cic_eth.cli.CeleryApp.from_config(config)
@ -194,16 +145,11 @@ def main():
pass
if len(query) > 64:
#registry = connect_registry(rpc, chain_spec, registry_address)
#admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer)
admin_api.tx(chain_spec, tx_raw=query, renderer=renderer)
elif len(query) > 40:
#registry = connect_registry(rpc, chain_spec, registry_address)
#admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer)
admin_api.tx(chain_spec, tx_hash=query, renderer=renderer)
elif len(query) == 40:
#registry = connect_registry(rpc, chain_spec, registry_address)
txs = admin_api.account(chain_spec, query, include_recipient=False, renderer=render_account)
renderer = render_account
elif len(query) >= 4 and query[:4] == 'lock':

View File

@ -1,4 +1,4 @@
celery==4.4.7
chainlib-eth>=0.0.10a16,<0.1.0
chainlib-eth>=0.0.10a20,<0.1.0
semver==2.13.0
crypto-dev-signer>=0.4.15rc2,<0.5.0
urlybird~=0.0.1a2

View File

@ -1,7 +1,7 @@
[metadata]
name = cic-eth
#version = attr: cic_eth.version.__version_string__
version = 0.12.4a14
version = 0.12.4a15
description = CIC Network Ethereum interaction
author = Louis Holbrook
author_email = dev@holbrook.no