cic-internal-integration/apps/cic-eth/cic_eth/server/routes.py

130 lines
4.6 KiB
Python
Raw Normal View History

2021-12-02 13:07:49 +01:00
"""
Contains all handlers for valid routes for the server
"""
# standard imports
import logging
from typing import Any, Callable, Dict, List, Type, Union
from cic_eth.server import cache, celery, converters, uwsgi
from cic_eth.server.openapi.UWSGIOpenAPIRequest import UWSGIOpenAPIRequest
log = logging.getLogger(__name__)
def handle_transactions(start_response, query: dict) -> List[bytes]:
address = query.pop('address')
data = celery.call('list', address, **query)
return uwsgi.respond(start_response, data)
def handle_balance(start_response, query: dict) -> List[bytes]:
address = query.pop('address')
token_symbol = query.pop('token_symbol')
log.info(f"token_symbol: {token_symbol}")
log.info(f"query: {query}")
data = celery.call('balance', address, token_symbol,
**query)
for b in data:
token_data = get_token_data(token_symbol)
b['balance_network'] = converters.from_wei(token_data.get(
'decimals'), int(b['balance_network']))
b['balance_incoming'] = converters.from_wei(token_data.get(
'decimals'), int(b['balance_incoming']))
b['balance_outgoing'] = converters.from_wei(token_data.get(
'decimals'), int(b['balance_outgoing']))
b.update({
"balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing'])
})
return uwsgi.respond(start_response, data)
def handle_create_account(start_response, query: dict) -> List[bytes]:
data = celery.call('create_account', **query)
return uwsgi.respond(start_response, data)
2021-12-08 12:32:56 +01:00
# def handle_refill_gas(start_response, query: dict) -> List[bytes]:
# address = query.pop('address')
# data = celery.call('refill_gas', address)
# return uwsgi.respond(start_response, data)
2021-12-02 13:07:49 +01:00
2021-12-08 12:32:56 +01:00
# def handle_ping(start_response, query: dict) -> List[bytes]:
# data = celery.call('ping', **query)
# return uwsgi.respond(start_response, data)
2021-12-02 13:07:49 +01:00
def handle_transfer(start_response, query: dict) -> List[bytes]:
from_address = query.pop('from_address')
to_address = query.pop('to_address')
value = query.pop('value')
token_symbol = query.pop('token_symbol')
wei_value = converters.to_wei(get_token_data(
token_symbol).get('decimals'), int(value))
data = celery.call('transfer', from_address,
to_address, wei_value, token_symbol)
return uwsgi.respond(start_response, data)
def handle_transfer_from(start_response, query: dict) -> List[bytes]:
from_address = query.pop('from_address')
to_address = query.pop('to_address')
value = query.pop('value')
token_symbol = query.pop('token_symbol')
spender_address = query.pop('spender_address')
wei_value = converters.to_wei(get_token_data(
token_symbol).get('decimals'), int(value))
data = celery.call('transfer_from', from_address, to_address,
wei_value, token_symbol, spender_address)
return uwsgi.respond(start_response, data)
def handle_token(start_response, query: dict) -> List[bytes]:
token_symbol = query.pop('token_symbol')
data = cache.get_token_data(token_symbol)
if data == None:
data = celery.call('token', token_symbol, **query)
cache.set_token_data(token_symbol, data)
return uwsgi.respond(start_response, data)
def handle_tokens(start_response, query: dict) -> List[bytes]:
token_symbols = query.pop('token_symbols')
data = celery.call('tokens', token_symbols, **query)
return uwsgi.respond(start_response, data)
def handle_default_token(start_response, _query) -> List[bytes]:
data = cache.get_default_token()
if data == None:
data = celery.call('default_token')
cache.set_default_token(data)
cache.set_token_data(data.get('token_symbol'), data)
return uwsgi.respond(start_response, data)
def get_token_data(token_symbol: str) -> List[bytes]:
data = cache.get_token_data(token_symbol)
log.debug(f"cached token data: {data}")
if data == None:
data = celery.call('token', token_symbol)
log.debug(
f"No token data setting token data for: {token_symbol} to {data}")
cache.set_token_data(token_symbol, data)
# TODO What is the second item in the list
return data[0]
routes: Dict[str, Callable[[Any, Union[None, dict]], List[bytes]]] = {
"/transactions": handle_transactions,
"/balance": handle_balance,
"/create_account": handle_create_account,
"/transfer": handle_transfer,
"/transfer_from": handle_transfer_from,
"/token": handle_token,
"/tokens": handle_tokens,
"/default_token": handle_default_token,
}