2021-11-16 14:37:16 +01:00
import json
import logging
import sys
import uuid
import redis
from cic_eth . api . api_task import Api
log = logging . getLogger ( __name__ )
2021-12-21 19:15:45 +01:00
2022-01-05 10:16:48 +01:00
def create_celery_wrapper ( chain_spec ,
celery_queue ,
redis_host ,
redis_port ,
redis_db ,
redis_timeout ) :
def call ( method , * args , catch = 1 , * * kwargs ) :
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning.
"""
log . debug ( f " Using chainspec: { chain_spec } " )
redis_channel = str ( uuid . uuid4 ( ) )
r = redis . Redis ( redis_host , redis_port , redis_db )
ps = r . pubsub ( )
ps . subscribe ( redis_channel )
api = Api (
chain_spec ,
queue = celery_queue ,
callback_param = ' {} : {} : {} : {} ' . format (
redis_host , redis_port , redis_db , redis_channel ) ,
callback_task = ' cic_eth.callbacks.redis.redis ' ,
callback_queue = celery_queue ,
)
getattr ( api , method ) ( * args , * * kwargs )
ps . get_message ( )
try :
data = [ ]
if catch == 1 :
message = ps . get_message ( timeout = redis_timeout )
data = json . loads ( message [ ' data ' ] ) [ " result " ]
else :
for _i in range ( catch ) :
message = ps . get_message (
timeout = redis_timeout )
result = json . loads ( message [ ' data ' ] ) [ " result " ]
data . append ( result )
except TimeoutError as e :
sys . stderr . write (
f " cic_eth.api. { method } ( { args } , { kwargs } ) timed out: \n { e } " )
raise e
except Exception as e :
sys . stderr . write (
f ' Unable to parse Data: \n { data } \n Error: \n { e } ' )
raise e
log . debug (
f " cic_eth.api. { method } (args= { args } , kwargs= { kwargs } ) \n { data } " )
ps . unsubscribe ( )
return data
return call