cic-internal-integration/apps/cic-eth/cic_eth/server/celery.py

61 lines
1.6 KiB
Python
Raw Normal View History

2021-11-16 14:37:16 +01:00
import json
import logging
import sys
import uuid
2021-11-19 14:50:13 +01:00
import cic_eth.cli
2021-11-16 14:37:16 +01:00
import redis
from cic_eth.api.api_task import Api
from cic_eth.server.config import config
log = logging.getLogger(__name__)
2021-11-19 14:50:13 +01:00
celery_app = cic_eth.cli.CeleryApp.from_config(config)
celery_app.set_default()
2021-11-16 14:37:16 +01:00
chain_spec = config.get('CHAIN_SPEC')
celery_queue = config.get('CELERY_QUEUE')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
2021-11-19 14:50:13 +01:00
def call(method, *args, **kwargs):
2021-11-16 14:37:16 +01:00
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call
"""
redis_channel = str(uuid.uuid4())
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
ps.subscribe(redis_channel)
api = Api(
chain_spec,
queue=celery_queue,
callback_param='{}:{}:{}:{}'.format(
redis_host, redis_port, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=celery_queue,
)
2021-11-23 13:17:13 +01:00
getattr(api, method)(*args, **kwargs)
2021-11-19 14:50:13 +01:00
2021-11-16 14:37:16 +01:00
ps.get_message()
try:
o = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
except TimeoutError as e:
sys.stderr.write(
2021-12-02 13:07:49 +01:00
f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}")
2021-12-08 13:05:17 +01:00
return None
2021-11-16 14:37:16 +01:00
2021-11-23 13:17:13 +01:00
log.debug(
2021-12-02 13:07:49 +01:00
f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {o}")
2021-11-17 13:21:23 +01:00
2021-11-16 14:37:16 +01:00
ps.unsubscribe()
try:
result = json.loads(o['data'])["result"]
return result
except Exception as e:
sys.stderr.write(
f'Unable to parse Data:\n{o}\n Error:\n{e}')
2021-11-23 13:17:13 +01:00
return None