cic-internal-integration/apps/cic-eth/examples/api_callback.py

77 lines
1.7 KiB
Python
Raw Permalink Normal View History

2021-02-01 18:12:51 +01:00
# standard imports
import logging
import uuid
import socket
# third-party imports
import celery
import redis
# local imports
import cic_eth
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
celery_app = celery.Celery(broker='redis://')
uu = uuid.uuid4()
r = redis.Redis()
r_key = '{}-results'.format(uu)
c = 3
results = []
for i in range(c):
x = uuid.uuid4()
results.append(str(x))
@celery_app.task(queue=str(uu), names='customcustom')
def custom_callback(result_from_chain, static_param_from_api, status_code_from_chain=0, message_from_chain={}):
global c
r.rpush(r_key, result_from_chain)
l = r.llen(r_key)
logg.debug('i {} l {} result {} url {} statuscode {} message {}'.format(
i,
l,
result_from_chain,
static_param_from_api,
status_code_from_chain,
message_from_chain,
)
)
if l == c:
r.delete(r_key)
celery_app.control.broadcast('shutdown', destination=['{}@{}'.format(uu, socket.gethostname())])
@celery_app.task(queue=str(uu), name='dodorunrunrun')
def custom_run(results):
for z in results:
logg.debug('sending {}'.format(z))
a = cic_eth.Api(callback_param=uu, callback_task=custom_callback)
a.ping(z)
if __name__ == '__main__':
r.delete(r_key)
s = celery.signature(
'dodorunrunrun',
[results],
queue=str(uu),
)
s.apply_async()
worker = celery_app.worker_main(
[
'api_callback',
'--loglevel=DEBUG',
'-n',
'{}@%h'.format(uu),
'-Q',
str(uu),
],
)