2021-08-06 18:29:01 +02:00
|
|
|
# standard imports
|
2021-10-25 20:01:25 +02:00
|
|
|
import hashlib
|
2021-08-06 18:29:01 +02:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
from typing import Dict, Optional
|
|
|
|
|
|
|
|
# external imports
|
|
|
|
from cic_eth.api import Api
|
2021-10-20 17:02:36 +02:00
|
|
|
from cic_types.condiments import MetadataPointer
|
2021-08-06 18:29:01 +02:00
|
|
|
|
|
|
|
# local imports
|
|
|
|
from cic_ussd.account.chain import Chain
|
|
|
|
from cic_ussd.cache import cache_data_key, get_cached_data
|
2021-10-25 20:01:25 +02:00
|
|
|
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
|
|
|
|
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
|
2021-08-06 18:29:01 +02:00
|
|
|
|
|
|
|
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2021-10-25 20:01:25 +02:00
|
|
|
def collate_token_metadata(token_info: dict, token_metadata: dict) -> dict:
|
|
|
|
"""
|
|
|
|
:param token_info:
|
|
|
|
:type token_info:
|
|
|
|
:param token_metadata:
|
|
|
|
:type token_metadata:
|
|
|
|
:return:
|
|
|
|
:rtype:
|
|
|
|
"""
|
|
|
|
description = token_info.get('description')
|
|
|
|
issuer = token_info.get('issuer')
|
|
|
|
location = token_metadata.get('location')
|
|
|
|
contact = token_metadata.get('contact')
|
|
|
|
return {
|
|
|
|
'description': description,
|
|
|
|
'issuer': issuer,
|
|
|
|
'location': location,
|
|
|
|
'contact': contact
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-08-06 18:29:01 +02:00
|
|
|
def get_cached_default_token(chain_str: str) -> Optional[str]:
|
|
|
|
"""This function attempts to retrieve the default token's data from the redis cache.
|
|
|
|
:param chain_str: chain name and network id.
|
|
|
|
:type chain_str: str
|
|
|
|
:return:
|
|
|
|
:rtype:
|
|
|
|
"""
|
|
|
|
logg.debug(f'Retrieving default token from cache for chain: {chain_str}')
|
2021-10-20 17:02:36 +02:00
|
|
|
key = cache_data_key(identifier=chain_str.encode('utf-8'), salt=MetadataPointer.TOKEN_DEFAULT)
|
2021-08-06 18:29:01 +02:00
|
|
|
return get_cached_data(key=key)
|
|
|
|
|
|
|
|
|
|
|
|
def get_default_token_symbol():
|
|
|
|
"""This function attempts to retrieve the default token's symbol from cached default token's data.
|
|
|
|
:raises SeppukuError: The system should terminate itself because the default token is required for an appropriate
|
|
|
|
system state.
|
|
|
|
:return: Default token's symbol.
|
|
|
|
:rtype: str
|
|
|
|
"""
|
|
|
|
chain_str = Chain.spec.__str__()
|
|
|
|
cached_default_token = get_cached_default_token(chain_str)
|
|
|
|
if cached_default_token:
|
|
|
|
default_token_data = json.loads(cached_default_token)
|
|
|
|
return default_token_data.get('symbol')
|
|
|
|
else:
|
|
|
|
logg.warning('Cached default token data not found. Attempting retrieval from default token API')
|
|
|
|
default_token_data = query_default_token(chain_str)
|
|
|
|
if default_token_data:
|
|
|
|
return default_token_data.get('symbol')
|
|
|
|
else:
|
|
|
|
raise SeppukuError(f'Could not retrieve default token for: {chain_str}')
|
|
|
|
|
|
|
|
|
2021-10-25 20:01:25 +02:00
|
|
|
def hashed_token_proof(token_proof: dict) -> str:
|
|
|
|
"""
|
|
|
|
:param token_proof:
|
|
|
|
:type token_proof:
|
|
|
|
:return:
|
|
|
|
:rtype:
|
|
|
|
"""
|
|
|
|
|
|
|
|
hash_object = hashlib.new("sha256")
|
2021-11-05 12:11:44 +01:00
|
|
|
hash_object.update(json.dumps(token_proof).encode('utf-8'))
|
2021-10-25 20:01:25 +02:00
|
|
|
return hash_object.digest().hex()
|
|
|
|
|
|
|
|
|
|
|
|
def process_token_data(token_symbol: str):
|
|
|
|
"""
|
|
|
|
:param token_symbol:
|
|
|
|
:type token_symbol:
|
|
|
|
:return:
|
|
|
|
:rtype:
|
|
|
|
"""
|
|
|
|
identifier = token_symbol.encode('utf-8')
|
|
|
|
query_token_metadata(identifier=identifier)
|
|
|
|
token_info = query_token_info(identifier=identifier)
|
|
|
|
hashed_token_info = hashed_token_proof(token_proof=token_info)
|
|
|
|
query_token_data(hashed_proofs=[hashed_token_info], token_symbols=[token_symbol])
|
|
|
|
|
|
|
|
|
2021-08-06 18:29:01 +02:00
|
|
|
def query_default_token(chain_str: str):
|
|
|
|
"""This function synchronously queries cic-eth for the deployed system's default token.
|
|
|
|
:param chain_str: Chain name and network id.
|
|
|
|
:type chain_str: str
|
|
|
|
:return: Token's data.
|
|
|
|
:rtype: dict
|
|
|
|
"""
|
|
|
|
logg.debug(f'Querying API for default token on chain: {chain_str}')
|
|
|
|
cic_eth_api = Api(chain_str=chain_str)
|
|
|
|
default_token_request_task = cic_eth_api.default_token()
|
|
|
|
return default_token_request_task.get()
|
2021-10-25 20:01:25 +02:00
|
|
|
|
|
|
|
|
|
|
|
def query_token_data(hashed_proofs: list, token_symbols: list):
|
|
|
|
"""
|
|
|
|
:param hashed_proofs:
|
|
|
|
:type hashed_proofs:
|
|
|
|
:param token_symbols:
|
|
|
|
:type token_symbols:
|
|
|
|
:return:
|
|
|
|
:rtype:
|
|
|
|
"""
|
|
|
|
api = Api(callback_param='',
|
|
|
|
callback_queue='cic-ussd',
|
|
|
|
chain_str=Chain.spec.__str__(),
|
|
|
|
callback_task='cic_ussd.tasks.callback_handler.token_data_callback')
|
|
|
|
api.tokens(token_symbols=token_symbols, proof=hashed_proofs)
|