cic-internal-integration/apps/cic-ussd/cic_ussd/tasks/callback_handler.py

243 lines
8.4 KiB
Python
Raw Normal View History

2021-02-06 16:13:47 +01:00
# standard imports
import json
import logging
2021-08-06 18:29:01 +02:00
from datetime import timedelta
2021-02-06 16:13:47 +01:00
# external imports
2021-02-06 16:13:47 +01:00
import celery
from cic_types.condiments import MetadataPointer
2021-02-06 16:13:47 +01:00
# local imports
2021-08-06 18:29:01 +02:00
from cic_ussd.account.balance import get_balances, calculate_available_balance
from cic_ussd.account.statement import generate
from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data
from cic_ussd.account.chain import Chain
2021-02-06 16:13:47 +01:00
from cic_ussd.db.models.base import SessionBase
2021-04-19 10:44:40 +02:00
from cic_ussd.db.models.account import Account
2021-08-06 18:29:01 +02:00
from cic_ussd.account.statement import filter_statement_transactions
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.account.tokens import collate_token_metadata, process_token_data
2021-08-06 18:29:01 +02:00
from cic_ussd.error import AccountCreationDataNotFound
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
2021-02-06 16:13:47 +01:00
logg = logging.getLogger(__file__)
celery_app = celery.current_app
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
2021-08-06 18:29:01 +02:00
def account_creation_callback(self, result: str, url: str, status_code: int):
2021-02-06 16:13:47 +01:00
"""This function defines a task that creates a user and
2021-08-06 18:29:01 +02:00
:param self: Reference providing access to the callback task instance.
:type self: celery.Task
2021-02-06 16:13:47 +01:00
:param result: The blockchain address for the created account
:type result: str
:param url: URL provided to callback task in cic-eth should http be used for callback.
:type url: str
:param status_code: The status of the task to create an account
:type status_code: int
"""
2021-08-06 18:29:01 +02:00
task_uuid = self.request.root_id
cached_account_creation_data = get_cached_data(task_uuid)
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
if not cached_account_creation_data:
raise AccountCreationDataNotFound(f'No account creation data found for task id: {task_uuid}')
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}')
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
account_creation_data = json.loads(cached_account_creation_data)
account_creation_data['status'] = 'CREATED'
cache_data(task_uuid, json.dumps(account_creation_data))
2021-06-23 10:54:34 +02:00
2021-08-06 18:29:01 +02:00
phone_number = account_creation_data.get('phone_number')
2021-06-23 10:54:34 +02:00
2021-08-06 18:29:01 +02:00
session = SessionBase.create_session()
account = Account(blockchain_address=result, phone_number=phone_number)
session.add(account)
session.commit()
session.close()
2021-10-07 17:12:35 +02:00
logg.debug(f'recorded account with identifier: {result}')
2021-06-23 10:54:34 +02:00
2021-08-06 18:29:01 +02:00
queue = self.request.delivery_info.get('routing_key')
s_phone_pointer = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer', [result, phone_number], queue=queue
)
s_phone_pointer.apply_async()
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
custom_metadata = {"tags": ["ussd", "individual"]}
s_custom_metadata = celery.signature(
'cic_ussd.tasks.metadata.add_custom_metadata', [result, custom_metadata], queue=queue
)
s_custom_metadata.apply_async()
Cache.store.expire(task_uuid, timedelta(seconds=180))
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
@celery_app.task
def balances_callback(result: list, param: str, status_code: int):
"""
:param result:
:type result:
:param param:
:type param:
:param status_code:
:type status_code:
:return:
:rtype:
"""
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
balances = result[0]
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(param)
key = cache_data_key(identifier, MetadataPointer.BALANCES)
2021-08-06 18:29:01 +02:00
cache_data(key, json.dumps(balances))
2021-02-06 16:13:47 +01:00
2021-07-20 18:18:27 +02:00
@celery_app.task(bind=True)
2021-08-06 18:29:01 +02:00
def statement_callback(self, result, param: str, status_code: int):
"""
:param self:
:type self:
:param result:
:type result:
:param param:
:type param:
:param status_code:
:type status_code:
:return:
:rtype:
"""
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
2021-07-20 18:18:27 +02:00
2021-08-06 18:29:01 +02:00
queue = self.request.delivery_info.get('routing_key')
statement_transactions = filter_statement_transactions(result)
for transaction in statement_transactions:
recipient_transaction, sender_transaction = transaction_actors(transaction)
if recipient_transaction.get('blockchain_address') == param:
2021-10-07 17:12:35 +02:00
recipient_transaction['alt_blockchain_address'] = sender_transaction.get('blockchain_address')
2021-08-06 18:29:01 +02:00
generate(param, queue, recipient_transaction)
if sender_transaction.get('blockchain_address') == param:
2021-10-07 17:12:35 +02:00
sender_transaction['alt_blockchain_address'] = recipient_transaction.get('blockchain_address')
2021-08-06 18:29:01 +02:00
generate(param, queue, sender_transaction)
2021-07-20 18:18:27 +02:00
2021-02-06 16:13:47 +01:00
@celery_app.task
def token_data_callback(result: dict, param: str, status_code: int):
"""
:param result:
:type result:
:param param:
:type param:
:param status_code:
:type status_code:
:return:
:rtype:
"""
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
for key, value in result.items():
token_symbol = result[key].get('symbol')
identifier = token_symbol.encode('utf-8')
token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
token_meta = get_cached_data(token_meta_key)
token_info = get_cached_data(token_info_key)
token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta)
token_data = {**token_data, **result[key]}
token_data_key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA)
cache_data(token_data_key, json.dumps(token_data))
2021-08-06 18:29:01 +02:00
@celery_app.task(bind=True)
def transaction_balances_callback(self, result: list, param: dict, status_code: int):
"""
:param self:
:type self:
:param result:
:type result:
:param param:
:type param:
:param status_code:
:type status_code:
:return:
:rtype:
"""
if status_code != 0:
2021-07-20 18:18:27 +02:00
raise ValueError(f'Unexpected status code: {status_code}.')
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
balances_data = result[0]
available_balance = calculate_available_balance(balances_data)
transaction = param
transaction['available_balance'] = available_balance
2021-07-20 18:18:27 +02:00
queue = self.request.delivery_info.get('routing_key')
2021-08-06 18:29:01 +02:00
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue
)
s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue)
2021-08-29 11:55:47 +02:00
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
@celery_app.task
2021-08-06 18:29:01 +02:00
def transaction_callback(result: dict, param: str, status_code: int):
"""
:param result:
:type result:
:param param:
:type param:
:param status_code:
:type status_code:
:return:
:rtype:
"""
if status_code != 0:
raise ValueError(f'Unexpected status code: {status_code}.')
2021-08-06 18:29:01 +02:00
chain_str = Chain.spec.__str__()
destination_token_symbol = result.get('destination_token_symbol')
destination_token_value = result.get('destination_token_value')
recipient_blockchain_address = result.get('recipient')
sender_blockchain_address = result.get('sender')
source_token_symbol = result.get('source_token_symbol')
source_token_value = result.get('source_token_value')
process_token_data(token_symbol=destination_token_symbol)
2021-08-06 18:29:01 +02:00
recipient_metadata = {
2021-08-25 12:33:35 +02:00
"alt_blockchain_address": sender_blockchain_address,
2021-08-06 18:29:01 +02:00
"blockchain_address": recipient_blockchain_address,
"role": "recipient",
2021-08-25 12:33:35 +02:00
"token_symbol": destination_token_symbol,
"token_value": destination_token_value,
2021-08-06 18:29:01 +02:00
"transaction_type": param
}
get_balances(
address=recipient_blockchain_address,
callback_param=recipient_metadata,
chain_str=chain_str,
callback_task='cic_ussd.tasks.callback_handler.transaction_balances_callback',
token_symbol=destination_token_symbol,
asynchronous=True)
if param == 'transfer':
sender_metadata = {
2021-08-25 12:33:35 +02:00
"alt_blockchain_address": recipient_blockchain_address,
2021-08-06 18:29:01 +02:00
"blockchain_address": sender_blockchain_address,
2021-08-25 12:33:35 +02:00
"role": "sender",
2021-08-06 18:29:01 +02:00
"token_symbol": source_token_symbol,
"token_value": source_token_value,
"transaction_type": param
}
2021-08-06 18:29:01 +02:00
get_balances(
address=sender_blockchain_address,
callback_param=sender_metadata,
chain_str=chain_str,
callback_task='cic_ussd.tasks.callback_handler.transaction_balances_callback',
token_symbol=source_token_symbol,
asynchronous=True)