From 22e142a2e34d1115836b10f9b2c616b9f2ca7406 Mon Sep 17 00:00:00 2001 From: PhilipWafula Date: Mon, 22 Nov 2021 13:26:01 +0300 Subject: [PATCH] Improves callback handle to handle multi-token ops. --- .../cic_ussd/tasks/callback_handler.py | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py index 5e8895e3..08bd56fa 100644 --- a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py +++ b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py @@ -1,4 +1,5 @@ # standard imports + import json import logging from datetime import timedelta @@ -14,9 +15,15 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data from cic_ussd.account.chain import Chain from cic_ussd.db.models.base import SessionBase from cic_ussd.db.models.account import Account +from cic_ussd.processor.util import wait_for_cache from cic_ussd.account.statement import filter_statement_transactions from cic_ussd.account.transaction import transaction_actors -from cic_ussd.account.tokens import collate_token_metadata, process_token_data +from cic_ussd.account.tokens import (collate_token_metadata, + get_cached_token_data, + get_default_token_symbol, + handle_token_symbol_list, + process_token_data, + set_active_token) from cic_ussd.error import AccountCreationDataNotFound from cic_ussd.tasks.base import CriticalSQLAlchemyTask @@ -58,6 +65,9 @@ def account_creation_callback(self, result: str, url: str, status_code: int): session.close() logg.debug(f'recorded account with identifier: {result}') + token_symbol = get_default_token_symbol() + set_active_token(blockchain_address=result, token_symbol=token_symbol) + queue = self.request.delivery_info.get('routing_key') s_phone_pointer = celery.signature( 'cic_ussd.tasks.metadata.add_phone_pointer', [result, phone_number], queue=queue @@ -88,8 +98,16 @@ def balances_callback(result: list, param: str, status_code: int): raise ValueError(f'Unexpected status code: {status_code}.') balances = result[0] - identifier = bytes.fromhex(param) - key = cache_data_key(identifier, MetadataPointer.BALANCES) + identifier = [] + param = param.split(',') + for identity in param: + try: + i = bytes.fromhex(identity) + identifier.append(i) + except ValueError: + i = identity.encode('utf-8') + identifier.append(i) + key = cache_data_key(identifier=identifier, salt=MetadataPointer.BALANCES) cache_data(key, json.dumps(balances)) @@ -138,17 +156,20 @@ def token_data_callback(result: dict, param: str, status_code: int): if status_code != 0: raise ValueError(f'Unexpected status code: {status_code}.') - for key, value in result.items(): - token_symbol = result[key].get('symbol') - identifier = token_symbol.encode('utf-8') - token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL) - token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL) - token_meta = get_cached_data(token_meta_key) - token_info = get_cached_data(token_info_key) - token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta) - token_data = {**token_data, **result[key]} - token_data_key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA) - cache_data(token_data_key, json.dumps(token_data)) + token = result[0] + token_symbol = token.get('symbol') + identifier = token_symbol.encode('utf-8') + token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL) + token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL) + token_meta = get_cached_data(token_meta_key) + token_meta = json.loads(token_meta) + token_info = get_cached_data(token_info_key) + token_info = json.loads(token_info) + token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta) + token_data = {**token_data, **token} + token_data_key = cache_data_key([bytes.fromhex(param), identifier], MetadataPointer.TOKEN_DATA) + cache_data(token_data_key, json.dumps(token_data)) + handle_token_symbol_list(blockchain_address=param, token_symbol=token_symbol) @celery_app.task(bind=True) @@ -167,10 +188,15 @@ def transaction_balances_callback(self, result: list, param: dict, status_code: """ if status_code != 0: raise ValueError(f'Unexpected status code: {status_code}.') - balances_data = result[0] - available_balance = calculate_available_balance(balances_data) transaction = param + token_symbol = transaction.get('token_symbol') + blockchain_address = transaction.get('blockchain_address') + identifier = [bytes.fromhex(blockchain_address), token_symbol.encode('utf-8')] + wait_for_cache(identifier, f'Cached token data for: {token_symbol}', MetadataPointer.TOKEN_DATA) + token_data = get_cached_token_data(blockchain_address, token_symbol) + decimals = token_data.get('decimals') + available_balance = calculate_available_balance(balances_data, decimals) transaction['available_balance'] = available_balance queue = self.request.delivery_info.get('routing_key') @@ -204,7 +230,7 @@ def transaction_callback(result: dict, param: str, status_code: int): source_token_symbol = result.get('source_token_symbol') source_token_value = result.get('source_token_value') - process_token_data(token_symbol=destination_token_symbol) + process_token_data(blockchain_address=recipient_blockchain_address, token_symbol=destination_token_symbol) recipient_metadata = { "alt_blockchain_address": sender_blockchain_address,