cic-internal-integration/apps/cic-eth/cic_eth/runnable/daemons/server.py

135 lines
4.1 KiB
Python

# standard imports
import json
import logging
import os
import sys
import uuid
from os import path
from urllib.parse import parse_qsl, urlparse
import cic_eth.cli
import redis
from cic_eth.api.api_task import Api
from cic_eth.server.UWSGIOpenAPIRequest import UWSGIOpenAPIRequest
from openapi_core import create_spec
from openapi_core.validation.request.datatypes import OpenAPIRequest
from openapi_core.validation.request.validators import RequestValidator
from openapi_spec_validator.schemas import read_yaml_file
from werkzeug.wrappers import Request
spec_dict = read_yaml_file(path.join(path.dirname(
__file__), '../../server/openapi/server.yaml'))
spec = create_spec(spec_dict)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
celery_app = cic_eth.cli.CeleryApp.from_config(config)
# uwsgi application
def application(env, start_response):
print(spec)
request = Request(env)
oAPIRequest = UWSGIOpenAPIRequest(request)
validator = RequestValidator(spec)
result = validator.validate(oAPIRequest)
# raise errors if request invalid
# result.raise_for_errors()
if result.errors:
# get list of errors
json_data = json.dumps(list(map(lambda e: str(e), result.errors)))
content = json_data.encode('utf-8')
headers = []
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',))
headers.append(('Content-Type', 'application/json',))
start_response('400 Invalid Request', headers)
return [content]
parsed_url = urlparse(env.get('REQUEST_URI')) # /api
path = parsed_url.path
parsed_url.params
params = dict(parse_qsl(parsed_url.query))
request_method = env.get('REQUEST_METHOD')
chain_spec = config.get('CHAIN_SPEC')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
celery_queue = config.get('CELERY_QUEUE')
redis_channel = str(uuid.uuid4())
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
ps.subscribe(redis_channel)
ps.get_message() # Subscription Object
api = Api(
chain_spec,
queue=celery_queue,
callback_param='{}:{}:{}:{}'.format(
redis_host, redis_port, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=celery_queue,
)
if path == '/list':
address = params.get('address')
print('address', address)
# address, limit=10, external_task=None, external_queue=None
api.list(address)
elif path == '/balance':
api.balance(**params)
elif path == '/create_account':
api.create_account(**params)
elif path == '/ping':
api.ping(**params)
elif path == '/transfer':
api.transfer(**params)
elif path == '/transfer_from':
api.transfer_from(**params)
elif path == '/token':
api.token(**params)
elif path == '/tokens':
api.tokens(**params)
elif path == '/default_token':
api.default_token()
ps.get_message() # returns None !?
try:
o = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
except TimeoutError as e:
sys.stderr.write(
'got no new address from cic-eth before timeout: {}\n'.format(e))
sys.exit(1)
ps.unsubscribe()
m = json.loads(o['data'])
print(m['result'])
data = {
"path": path,
"request_method": request_method,
"result": m
}
json_data = json.dumps(data)
content = json_data.encode('utf-8')
headers = []
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',))
headers.append(('Content-Type', 'application/json',))
start_response('200 OK', headers)
return [content]