cic-internal-integration/apps/cic-eth/cic_eth/server/celery.py

65 lines
2.1 KiB
Python

import json
import logging
import sys
import uuid
import redis
from cic_eth.api.api_task import Api
log = logging.getLogger(__name__)
def create_celery_wrapper(chain_spec,
celery_queue,
redis_host,
redis_port,
redis_db,
redis_timeout):
def call(method, *args, catch=1, **kwargs):
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning.
"""
log.debug(f"Using redis: {redis_host}, {redis_port}, {redis_db}")
redis_channel = str(uuid.uuid4())
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
ps.subscribe(redis_channel)
api = Api(
chain_spec,
queue=celery_queue,
callback_param='{}:{}:{}:{}'.format(
redis_host, redis_port, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=celery_queue,
)
getattr(api, method)(*args, **kwargs)
ps.get_message()
try:
data = []
if catch == 1:
message = ps.get_message(timeout=redis_timeout)
data = json.loads(message['data'])["result"]
raise data
else:
for _i in range(catch):
message = ps.get_message(
timeout=redis_timeout)
result = json.loads(message['data'])["result"]
data.append(result)
except TimeoutError as e:
sys.stderr.write(
f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}")
raise e
except Exception as e:
sys.stderr.write(
f'Unable to parse Data:\n{data}\n Error:\n{e}')
raise e
log.debug(
f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}")
ps.unsubscribe()
return data
return call