130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
"""
|
|
Contains all handlers for valid routes for the server
|
|
"""
|
|
|
|
# standard imports
|
|
import logging
|
|
from typing import Any, Callable, Dict, List, Type, Union
|
|
|
|
from cic_eth.server import cache, celery, converters, uwsgi
|
|
from cic_eth.server.openapi.UWSGIOpenAPIRequest import UWSGIOpenAPIRequest
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def handle_transactions(start_response, query: dict) -> List[bytes]:
|
|
address = query.pop('address')
|
|
data = celery.call('list', address, **query)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_balance(start_response, query: dict) -> List[bytes]:
|
|
address = query.pop('address')
|
|
token_symbol = query.pop('token_symbol')
|
|
log.info(f"token_symbol: {token_symbol}")
|
|
log.info(f"query: {query}")
|
|
data = celery.call('balance', address, token_symbol,
|
|
**query)
|
|
for b in data:
|
|
token_data = get_token_data(token_symbol)
|
|
b['balance_network'] = converters.from_wei(token_data.get(
|
|
'decimals'), int(b['balance_network']))
|
|
b['balance_incoming'] = converters.from_wei(token_data.get(
|
|
'decimals'), int(b['balance_incoming']))
|
|
b['balance_outgoing'] = converters.from_wei(token_data.get(
|
|
'decimals'), int(b['balance_outgoing']))
|
|
|
|
b.update({
|
|
"balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing'])
|
|
})
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_create_account(start_response, query: dict) -> List[bytes]:
|
|
data = celery.call('create_account', **query)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
# def handle_refill_gas(start_response, query: dict) -> List[bytes]:
|
|
# address = query.pop('address')
|
|
# data = celery.call('refill_gas', address)
|
|
# return uwsgi.respond(start_response, data)
|
|
|
|
|
|
# def handle_ping(start_response, query: dict) -> List[bytes]:
|
|
# data = celery.call('ping', **query)
|
|
# return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_transfer(start_response, query: dict) -> List[bytes]:
|
|
from_address = query.pop('from_address')
|
|
to_address = query.pop('to_address')
|
|
value = query.pop('value')
|
|
token_symbol = query.pop('token_symbol')
|
|
wei_value = converters.to_wei(get_token_data(
|
|
token_symbol).get('decimals'), int(value))
|
|
data = celery.call('transfer', from_address,
|
|
to_address, wei_value, token_symbol)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_transfer_from(start_response, query: dict) -> List[bytes]:
|
|
from_address = query.pop('from_address')
|
|
to_address = query.pop('to_address')
|
|
value = query.pop('value')
|
|
token_symbol = query.pop('token_symbol')
|
|
spender_address = query.pop('spender_address')
|
|
wei_value = converters.to_wei(get_token_data(
|
|
token_symbol).get('decimals'), int(value))
|
|
data = celery.call('transfer_from', from_address, to_address,
|
|
wei_value, token_symbol, spender_address)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_token(start_response, query: dict) -> List[bytes]:
|
|
token_symbol = query.pop('token_symbol')
|
|
data = cache.get_token_data(token_symbol)
|
|
if data == None:
|
|
data = celery.call('token', token_symbol, **query)
|
|
cache.set_token_data(token_symbol, data)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_tokens(start_response, query: dict) -> List[bytes]:
|
|
token_symbols = query.pop('token_symbols')
|
|
data = celery.call('tokens', token_symbols, **query)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def handle_default_token(start_response, _query) -> List[bytes]:
|
|
data = cache.get_default_token()
|
|
if data == None:
|
|
data = celery.call('default_token')
|
|
cache.set_default_token(data)
|
|
cache.set_token_data(data.get('token_symbol'), data)
|
|
return uwsgi.respond(start_response, data)
|
|
|
|
|
|
def get_token_data(token_symbol: str) -> List[bytes]:
|
|
data = cache.get_token_data(token_symbol)
|
|
log.debug(f"cached token data: {data}")
|
|
if data == None:
|
|
data = celery.call('token', token_symbol)
|
|
log.debug(
|
|
f"No token data setting token data for: {token_symbol} to {data}")
|
|
cache.set_token_data(token_symbol, data)
|
|
# TODO What is the second item in the list
|
|
return data[0]
|
|
|
|
|
|
routes: Dict[str, Callable[[Any, Union[None, dict]], List[bytes]]] = {
|
|
"/transactions": handle_transactions,
|
|
"/balance": handle_balance,
|
|
"/create_account": handle_create_account,
|
|
"/transfer": handle_transfer,
|
|
"/transfer_from": handle_transfer_from,
|
|
"/token": handle_token,
|
|
"/tokens": handle_tokens,
|
|
"/default_token": handle_default_token,
|
|
}
|