Improves callback handle to handle multi-token ops.

This commit is contained in:
PhilipWafula 2021-11-22 13:26:01 +03:00
parent f52494e633
commit 22e142a2e3
Signed by untrusted user: mango-habanero
GPG Key ID: B00CE9034DA19FB7
1 changed files with 43 additions and 17 deletions

View File

@ -1,4 +1,5 @@
# standard imports
import json
import logging
from datetime import timedelta
@ -14,9 +15,15 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data
from cic_ussd.account.chain import Chain
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_cache
from cic_ussd.account.statement import filter_statement_transactions
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.account.tokens import collate_token_metadata, process_token_data
from cic_ussd.account.tokens import (collate_token_metadata,
get_cached_token_data,
get_default_token_symbol,
handle_token_symbol_list,
process_token_data,
set_active_token)
from cic_ussd.error import AccountCreationDataNotFound
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
@ -58,6 +65,9 @@ def account_creation_callback(self, result: str, url: str, status_code: int):
session.close()
logg.debug(f'recorded account with identifier: {result}')
token_symbol = get_default_token_symbol()
set_active_token(blockchain_address=result, token_symbol=token_symbol)
queue = self.request.delivery_info.get('routing_key')
s_phone_pointer = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer', [result, phone_number], queue=queue
@ -88,8 +98,16 @@ def balances_callback(result: list, param: str, status_code: int):
raise ValueError(f'Unexpected status code: {status_code}.')
balances = result[0]
identifier = bytes.fromhex(param)
key = cache_data_key(identifier, MetadataPointer.BALANCES)
identifier = []
param = param.split(',')
for identity in param:
try:
i = bytes.fromhex(identity)
identifier.append(i)
except ValueError:
i = identity.encode('utf-8')
identifier.append(i)
key = cache_data_key(identifier=identifier, salt=MetadataPointer.BALANCES)
cache_data(key, json.dumps(balances))
@ -138,17 +156,20 @@ def token_data_callback(result: dict, param: str, status_code: int):
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
for key, value in result.items():
token_symbol = result[key].get('symbol')
identifier = token_symbol.encode('utf-8')
token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
token_meta = get_cached_data(token_meta_key)
token_info = get_cached_data(token_info_key)
token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta)
token_data = {**token_data, **result[key]}
token_data_key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA)
cache_data(token_data_key, json.dumps(token_data))
token = result[0]
token_symbol = token.get('symbol')
identifier = token_symbol.encode('utf-8')
token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
token_meta = get_cached_data(token_meta_key)
token_meta = json.loads(token_meta)
token_info = get_cached_data(token_info_key)
token_info = json.loads(token_info)
token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta)
token_data = {**token_data, **token}
token_data_key = cache_data_key([bytes.fromhex(param), identifier], MetadataPointer.TOKEN_DATA)
cache_data(token_data_key, json.dumps(token_data))
handle_token_symbol_list(blockchain_address=param, token_symbol=token_symbol)
@celery_app.task(bind=True)
@ -167,10 +188,15 @@ def transaction_balances_callback(self, result: list, param: dict, status_code:
"""
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
balances_data = result[0]
available_balance = calculate_available_balance(balances_data)
transaction = param
token_symbol = transaction.get('token_symbol')
blockchain_address = transaction.get('blockchain_address')
identifier = [bytes.fromhex(blockchain_address), token_symbol.encode('utf-8')]
wait_for_cache(identifier, f'Cached token data for: {token_symbol}', MetadataPointer.TOKEN_DATA)
token_data = get_cached_token_data(blockchain_address, token_symbol)
decimals = token_data.get('decimals')
available_balance = calculate_available_balance(balances_data, decimals)
transaction['available_balance'] = available_balance
queue = self.request.delivery_info.get('routing_key')
@ -204,7 +230,7 @@ def transaction_callback(result: dict, param: str, status_code: int):
source_token_symbol = result.get('source_token_symbol')
source_token_value = result.get('source_token_value')
process_token_data(token_symbol=destination_token_symbol)
process_token_data(blockchain_address=recipient_blockchain_address, token_symbol=destination_token_symbol)
recipient_metadata = {
"alt_blockchain_address": sender_blockchain_address,