Add celery debug flag, extended results, paths graph scripts stub

This commit is contained in:
nolash 2021-04-25 14:00:42 +02:00
parent 965aeeacb9
commit 425bd184df
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 67 additions and 7 deletions

View File

@ -121,21 +121,25 @@ broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
current_app.conf.update({
conf_update = {
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_extended': True,
},
)
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
current_app.conf.update({
conf_update = {
'broker_url': broker,
})
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':

View File

@ -1,3 +1,4 @@
[celery]
broker_url = redis://
result_url = redis://
debug = 0

View File

@ -1,3 +1,4 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379
debug = 0

View File

@ -0,0 +1,53 @@
# standard imports
import sys
import os
import logging
import json
# external imports
import celery
import confini
# local imports
from cic_eth.api import Api
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
config_dir = os.path.join(script_dir, '..', 'config')
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'), result_extended=True)
class Fmtr(celery.utils.graph.GraphFormatter):
def label(self, obj):
super(Fmtr, self).label(obj)
if obj != None:
logg.debug('obj {} attrs'.format(obj.id))
return obj.queue
def main():
api = Api(
config.get('CIC_CHAIN_SPEC'),
queue='cic-eth',
#callback_param='{}:{}:{}:{}'.format(args.redis_host_callback, args.redis_port_callback, redis_db, redis_channel),
#callback_task='cic_eth.callbacks.redis.redis',
#callback_queue=args.q,
)
t = api.create_account(register=False)
for e in t.collect():
print(e[0].backend.get('celery-task-meta-{}'.format(e[0].id)))
print(e[0].name)
#t.build_graph(intermediate=True, formatter=Fmtr()).to_dot(sys.stdout)ka
#v = celery_app.control.inspect().query_task(t.id).objgraph()
#v = celery_app.control.inspect().stats()
if __name__ == '__main__':
main()

View File

@ -247,6 +247,7 @@ services:
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
CELERY_DEBUG: ${CELERY_DEBUG:-1}
SIGNER_SOCKET_PATH: ${SIGNER_SOCKET_PATH:-ipc:///run/crypto-dev-signer/jsonrpc.ipc}
SIGNER_SECRET: ${SIGNER_SECRET:-deadbeef}
ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER: ${DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER:-0xACB0BC74E1686D62dE7DC6414C999EA60C09F0eA}