cic-internal-integration/apps/cic-ussd/cic_ussd/account/tokens.py

129 lines
4.0 KiB
Python

# standard imports
import hashlib
import json
import logging
from typing import Dict, Optional
# external imports
from cic_eth.api import Api
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
logg = logging.getLogger(__name__)
def collate_token_metadata(token_info: dict, token_metadata: dict) -> dict:
"""
:param token_info:
:type token_info:
:param token_metadata:
:type token_metadata:
:return:
:rtype:
"""
description = token_info.get('description')
issuer = token_info.get('issuer')
location = token_metadata.get('location')
contact = token_metadata.get('contact')
return {
'description': description,
'issuer': issuer,
'location': location,
'contact': contact
}
def get_cached_default_token(chain_str: str) -> Optional[str]:
"""This function attempts to retrieve the default token's data from the redis cache.
:param chain_str: chain name and network id.
:type chain_str: str
:return:
:rtype:
"""
logg.debug(f'Retrieving default token from cache for chain: {chain_str}')
key = cache_data_key(identifier=chain_str.encode('utf-8'), salt=MetadataPointer.TOKEN_DEFAULT)
return get_cached_data(key=key)
def get_default_token_symbol():
"""This function attempts to retrieve the default token's symbol from cached default token's data.
:raises SeppukuError: The system should terminate itself because the default token is required for an appropriate
system state.
:return: Default token's symbol.
:rtype: str
"""
chain_str = Chain.spec.__str__()
cached_default_token = get_cached_default_token(chain_str)
if cached_default_token:
default_token_data = json.loads(cached_default_token)
return default_token_data.get('symbol')
else:
logg.warning('Cached default token data not found. Attempting retrieval from default token API')
default_token_data = query_default_token(chain_str)
if default_token_data:
return default_token_data.get('symbol')
else:
raise SeppukuError(f'Could not retrieve default token for: {chain_str}')
def hashed_token_proof(token_proof: dict) -> str:
"""
:param token_proof:
:type token_proof:
:return:
:rtype:
"""
hash_object = hashlib.new("sha256")
hash_object.update(json.dumps(token_proof))
return hash_object.digest().hex()
def process_token_data(token_symbol: str):
"""
:param token_symbol:
:type token_symbol:
:return:
:rtype:
"""
identifier = token_symbol.encode('utf-8')
query_token_metadata(identifier=identifier)
token_info = query_token_info(identifier=identifier)
hashed_token_info = hashed_token_proof(token_proof=token_info)
query_token_data(hashed_proofs=[hashed_token_info], token_symbols=[token_symbol])
def query_default_token(chain_str: str):
"""This function synchronously queries cic-eth for the deployed system's default token.
:param chain_str: Chain name and network id.
:type chain_str: str
:return: Token's data.
:rtype: dict
"""
logg.debug(f'Querying API for default token on chain: {chain_str}')
cic_eth_api = Api(chain_str=chain_str)
default_token_request_task = cic_eth_api.default_token()
return default_token_request_task.get()
def query_token_data(hashed_proofs: list, token_symbols: list):
"""
:param hashed_proofs:
:type hashed_proofs:
:param token_symbols:
:type token_symbols:
:return:
:rtype:
"""
api = Api(callback_param='',
callback_queue='cic-ussd',
chain_str=Chain.spec.__str__(),
callback_task='cic_ussd.tasks.callback_handler.token_data_callback')
api.tokens(token_symbols=token_symbols, proof=hashed_proofs)