cic-internal-integration/apps/cic-eth/cic_eth/runnable/daemons/server.py

109 lines
3.6 KiB
Python

# standard imports
import json
import logging
from os import path
from urllib.parse import parse_qsl, urlparse
from cic_eth.server.celery_helper import call
from cic_eth.server.UWSGIOpenAPIRequest import UWSGIOpenAPIRequest
from openapi_core import create_spec
from openapi_core.validation.request.validators import RequestValidator
from openapi_spec_validator.schemas import read_yaml_file
spec_dict = read_yaml_file(path.join(path.dirname(
__file__), '../../server/openapi/server.yaml'))
spec = create_spec(spec_dict)
logging.basicConfig(level=logging.WARNING)
log = logging.getLogger()
# TODO Implement wei conversions
# uwsgi application
def application(env, start_response):
# Validate incoming request against the open api spec
oAPIRequest = UWSGIOpenAPIRequest(env)
validator = RequestValidator(spec)
result = validator.validate(oAPIRequest)
if result.errors:
json_data = json.dumps(list(map(lambda e: str(e), result.errors)))
content = json_data.encode('utf-8')
headers = []
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',))
headers.append(('Content-Type', 'application/json',))
start_response('400 Invalid Request', headers)
return [content]
parsed_url = urlparse(env.get('REQUEST_URI'))
path = parsed_url.path
params = dict(parse_qsl(parsed_url.query))
if path == '/transactions':
address = params.pop('address')
print('address', address, )
# address, limit=10
data = call('list', address, **params)
elif path == '/balance':
address = params.pop('address')
token_symbol = params.pop('token_symbol')
data = call('balance', address, token_symbol, **params)
for b in data:
b.update({
"balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing'])
})
elif path == '/create_account':
data = call('create_account', **params)
elif path == '/refill_gas':
address = params.pop('address')
data = call('refill_gas', address)
elif path == '/ping':
data = call('ping', **params)
elif path == '/transfer':
from_address = params.pop('from_address')
to_address = params.pop('to_address')
value = params.pop('value')
token_symbol = params.pop('token_symbol')
data = call('transfer', from_address, to_address, value, token_symbol)
elif path == '/transfer_from':
from_address = params.pop('from_address')
to_address = params.pop('to_address')
value = params.pop('value')
token_symbol = params.pop('token_symbol')
spender_address = params.pop('spender_address')
data = call('transfer_from', from_address, to_address,
value, token_symbol, spender_address)
elif path == '/token':
token_symbol = params.pop('token_symbol')
data = call('token', token_symbol, **params)
elif path == '/tokens':
token_symbols = params.pop('token_symbols')
data = call('tokens', token_symbols, **params)
elif path == '/default_token':
data = call('default_token')
else:
start_response('404 Seems like you are lost', [])
return []
json_data = json.dumps(data)
content = json_data.encode('utf-8')
headers = []
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',))
headers.append(('Content-Type', 'application/json',))
start_response('200 OK', headers)
return [content]