# standard imports import hashlib import json import logging from typing import Dict, Optional # external imports from cic_eth.api import Api from cic_types.condiments import MetadataPointer # local imports from cic_ussd.account.chain import Chain from cic_ussd.cache import cache_data_key, get_cached_data from cic_ussd.error import CachedDataNotFoundError, SeppukuError from cic_ussd.metadata.tokens import query_token_info, query_token_metadata logg = logging.getLogger(__name__) def collate_token_metadata(token_info: dict, token_metadata: dict) -> dict: """ :param token_info: :type token_info: :param token_metadata: :type token_metadata: :return: :rtype: """ description = token_info.get('description') issuer = token_info.get('issuer') location = token_metadata.get('location') contact = token_metadata.get('contact') return { 'description': description, 'issuer': issuer, 'location': location, 'contact': contact } def get_cached_default_token(chain_str: str) -> Optional[str]: """This function attempts to retrieve the default token's data from the redis cache. :param chain_str: chain name and network id. :type chain_str: str :return: :rtype: """ logg.debug(f'Retrieving default token from cache for chain: {chain_str}') key = cache_data_key(identifier=chain_str.encode('utf-8'), salt=MetadataPointer.TOKEN_DEFAULT) return get_cached_data(key=key) def get_default_token_symbol(): """This function attempts to retrieve the default token's symbol from cached default token's data. :raises SeppukuError: The system should terminate itself because the default token is required for an appropriate system state. :return: Default token's symbol. :rtype: str """ chain_str = Chain.spec.__str__() cached_default_token = get_cached_default_token(chain_str) if cached_default_token: default_token_data = json.loads(cached_default_token) return default_token_data.get('symbol') else: logg.warning('Cached default token data not found. Attempting retrieval from default token API') default_token_data = query_default_token(chain_str) if default_token_data: return default_token_data.get('symbol') else: raise SeppukuError(f'Could not retrieve default token for: {chain_str}') def hashed_token_proof(token_proof: dict) -> str: """ :param token_proof: :type token_proof: :return: :rtype: """ hash_object = hashlib.new("sha256") hash_object.update(json.dumps(token_proof)) return hash_object.digest().hex() def process_token_data(token_symbol: str): """ :param token_symbol: :type token_symbol: :return: :rtype: """ identifier = token_symbol.encode('utf-8') query_token_metadata(identifier=identifier) token_info = query_token_info(identifier=identifier) hashed_token_info = hashed_token_proof(token_proof=token_info) query_token_data(hashed_proofs=[hashed_token_info], token_symbols=[token_symbol]) def query_default_token(chain_str: str): """This function synchronously queries cic-eth for the deployed system's default token. :param chain_str: Chain name and network id. :type chain_str: str :return: Token's data. :rtype: dict """ logg.debug(f'Querying API for default token on chain: {chain_str}') cic_eth_api = Api(chain_str=chain_str) default_token_request_task = cic_eth_api.default_token() return default_token_request_task.get() def query_token_data(hashed_proofs: list, token_symbols: list): """ :param hashed_proofs: :type hashed_proofs: :param token_symbols: :type token_symbols: :return: :rtype: """ api = Api(callback_param='', callback_queue='cic-ussd', chain_str=Chain.spec.__str__(), callback_task='cic_ussd.tasks.callback_handler.token_data_callback') api.tokens(token_symbols=token_symbols, proof=hashed_proofs)