""" Contains all handlers for valid routes for the server """ # standard imports import logging from typing import Any, Callable, Dict, List, Type, Union from cic_eth.server import cache, celery, converters, uwsgi from cic_eth.server.openapi.UWSGIOpenAPIRequest import UWSGIOpenAPIRequest log = logging.getLogger(__name__) def handle_transactions(start_response, query: dict) -> List[bytes]: address = query.pop('address') data = celery.call('list', address, **query) return uwsgi.respond(start_response, data) def handle_balance(start_response, query: dict) -> List[bytes]: address = query.pop('address') token_symbol = query.pop('token_symbol') log.info(f"token_symbol: {token_symbol}") log.info(f"query: {query}") data = celery.call('balance', address, token_symbol, **query) for b in data: token_data = get_token_data(token_symbol) b['balance_network'] = converters.from_wei(token_data.get( 'decimals'), int(b['balance_network'])) b['balance_incoming'] = converters.from_wei(token_data.get( 'decimals'), int(b['balance_incoming'])) b['balance_outgoing'] = converters.from_wei(token_data.get( 'decimals'), int(b['balance_outgoing'])) b.update({ "balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing']) }) return uwsgi.respond(start_response, data) def handle_create_account(start_response, query: dict) -> List[bytes]: data = celery.call('create_account', **query) return uwsgi.respond(start_response, data) def handle_refill_gas(start_response, query: dict) -> List[bytes]: address = query.pop('address') data = celery.call('refill_gas', address) return uwsgi.respond(start_response, data) def handle_ping(start_response, query: dict) -> List[bytes]: data = celery.call('ping', **query) return uwsgi.respond(start_response, data) def handle_transfer(start_response, query: dict) -> List[bytes]: from_address = query.pop('from_address') to_address = query.pop('to_address') value = query.pop('value') token_symbol = query.pop('token_symbol') wei_value = converters.to_wei(get_token_data( token_symbol).get('decimals'), int(value)) data = celery.call('transfer', from_address, to_address, wei_value, token_symbol) return uwsgi.respond(start_response, data) def handle_transfer_from(start_response, query: dict) -> List[bytes]: from_address = query.pop('from_address') to_address = query.pop('to_address') value = query.pop('value') token_symbol = query.pop('token_symbol') spender_address = query.pop('spender_address') wei_value = converters.to_wei(get_token_data( token_symbol).get('decimals'), int(value)) data = celery.call('transfer_from', from_address, to_address, wei_value, token_symbol, spender_address) return uwsgi.respond(start_response, data) def handle_token(start_response, query: dict) -> List[bytes]: token_symbol = query.pop('token_symbol') data = cache.get_token_data(token_symbol) if data == None: data = celery.call('token', token_symbol, **query) cache.set_token_data(token_symbol, data) return uwsgi.respond(start_response, data) def handle_tokens(start_response, query: dict) -> List[bytes]: token_symbols = query.pop('token_symbols') data = celery.call('tokens', token_symbols, **query) return uwsgi.respond(start_response, data) def handle_default_token(start_response, _query) -> List[bytes]: data = cache.get_default_token() if data == None: data = celery.call('default_token') cache.set_default_token(data) cache.set_token_data(data.get('token_symbol'), data) return uwsgi.respond(start_response, data) def get_token_data(token_symbol: str) -> List[bytes]: data = cache.get_token_data(token_symbol) log.debug(f"cached token data: {data}") if data == None: data = celery.call('token', token_symbol) log.debug( f"No token data setting token data for: {token_symbol} to {data}") cache.set_token_data(token_symbol, data) # TODO What is the second item in the list return data[0] routes: Dict[str, Callable[[Any, Union[None, dict]], List[bytes]]] = { "/transactions": handle_transactions, "/balance": handle_balance, "/create_account": handle_create_account, "/refill_gas": handle_refill_gas, "/ping": handle_ping, "/transfer": handle_transfer, "/transfer_from": handle_transfer_from, "/token": handle_token, "/tokens": handle_tokens, "/default_token": handle_default_token, }