70 lines
2.0 KiB
Python
70 lines
2.0 KiB
Python
import json
|
|
import logging
|
|
import sys
|
|
import uuid
|
|
|
|
import cic_eth.cli
|
|
import redis
|
|
from cic_eth.api.api_task import Api
|
|
from cic_eth.server.config import config
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
celery_app = cic_eth.cli.CeleryApp.from_config(config)
|
|
celery_app.set_default()
|
|
|
|
|
|
chain_spec = config.get('CHAIN_SPEC')
|
|
celery_queue = config.get('CELERY_QUEUE')
|
|
|
|
redis_host = config.get('REDIS_HOST')
|
|
redis_port = config.get('REDIS_PORT')
|
|
redis_db = config.get('REDIS_DB')
|
|
|
|
|
|
def call(method, *args, catch=1, **kwargs):
|
|
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning.
|
|
"""
|
|
log.debug(f"Using chainspec: {chain_spec}")
|
|
redis_channel = str(uuid.uuid4())
|
|
r = redis.Redis(redis_host, redis_port, redis_db)
|
|
ps = r.pubsub()
|
|
ps.subscribe(redis_channel)
|
|
api = Api(
|
|
chain_spec,
|
|
queue=celery_queue,
|
|
callback_param='{}:{}:{}:{}'.format(
|
|
redis_host, redis_port, redis_db, redis_channel),
|
|
callback_task='cic_eth.callbacks.redis.redis',
|
|
callback_queue=celery_queue,
|
|
)
|
|
getattr(api, method)(*args, **kwargs)
|
|
|
|
ps.get_message()
|
|
try:
|
|
data = []
|
|
if catch == 1:
|
|
message = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
|
|
data = json.loads(message['data'])["result"]
|
|
else:
|
|
for _i in range(catch):
|
|
message = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
|
|
result = json.loads(message['data'])["result"]
|
|
data.append(result)
|
|
|
|
except TimeoutError as e:
|
|
sys.stderr.write(
|
|
f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}")
|
|
raise e
|
|
except Exception as e:
|
|
sys.stderr.write(
|
|
f'Unable to parse Data:\n{data}\n Error:\n{e}')
|
|
return None
|
|
|
|
log.debug(
|
|
f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}")
|
|
|
|
ps.unsubscribe()
|
|
return data
|