cic-internal-integration/apps/cic-ussd/cic_ussd/tasks/processor.py

96 lines
3.7 KiB
Python
Raw Permalink Normal View History

2021-07-20 18:18:27 +02:00
# standard imports
2021-08-06 18:29:01 +02:00
import json
2021-07-20 18:18:27 +02:00
import logging
# external imports
2021-07-20 18:18:27 +02:00
import celery
2021-08-06 18:29:01 +02:00
import i18n
from cic_types.condiments import MetadataPointer
2021-07-20 18:18:27 +02:00
# local imports
2021-08-29 11:55:47 +02:00
from cic_ussd.account.metadata import get_cached_preferred_language
2021-08-06 18:29:01 +02:00
from cic_ussd.account.statement import get_cached_statement
from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account
from cic_ussd.cache import cache_data, cache_data_key
2021-08-25 12:33:35 +02:00
from cic_ussd.db.models.account import Account
2021-07-20 18:18:27 +02:00
from cic_ussd.db.models.base import SessionBase
celery_app = celery.current_app
logg = logging.getLogger(__file__)
2021-08-06 18:29:01 +02:00
@celery_app.task(bind=True)
def generate_statement(self, querying_party: str, transaction: dict):
""""""
queue = self.request.delivery_info.get('routing_key')
s_parse_transaction = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue
)
s_cache_statement = celery.signature(
'cic_ussd.tasks.processor.cache_statement', [querying_party], queue=queue
)
2021-10-07 17:12:35 +02:00
celery.chain(s_parse_transaction, s_cache_statement).apply_async()
2021-08-06 18:29:01 +02:00
2021-07-20 18:18:27 +02:00
@celery_app.task
2021-08-06 18:29:01 +02:00
def cache_statement(parsed_transaction: dict, querying_party: str):
2021-07-20 18:18:27 +02:00
"""
2021-08-06 18:29:01 +02:00
:param parsed_transaction:
:type parsed_transaction:
:param querying_party:
:type querying_party:
2021-07-20 18:18:27 +02:00
:return:
:rtype:
"""
2021-08-06 18:29:01 +02:00
cached_statement = get_cached_statement(querying_party)
statement_transactions = []
if cached_statement:
statement_transactions = json.loads(cached_statement)
if parsed_transaction not in statement_transactions:
statement_transactions.append(parsed_transaction)
2021-08-06 18:29:01 +02:00
data = json.dumps(statement_transactions)
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(querying_party)
key = cache_data_key(identifier, MetadataPointer.STATEMENT)
2021-08-06 18:29:01 +02:00
cache_data(key, data)
2021-07-20 18:18:27 +02:00
2021-08-06 18:29:01 +02:00
@celery_app.task
2021-08-29 11:55:47 +02:00
def parse_transaction(transaction: dict) -> dict:
2021-08-06 18:29:01 +02:00
"""This function parses transaction objects and collates all relevant data for system use i.e:
- An account's set preferred language.
- Account identifier that facilitates notification.
- Contextual tags i.e action and direction tags.
:param transaction: Transaction object.
:type transaction: dict
:return: Transaction object with contextual data for use in the system.
:rtype: dict
"""
2021-08-29 11:55:47 +02:00
preferred_language = get_cached_preferred_language(transaction.get('blockchain_address'))
2021-07-20 18:18:27 +02:00
if not preferred_language:
2021-08-06 18:29:01 +02:00
preferred_language = i18n.config.get('fallback')
2021-08-25 12:33:35 +02:00
transaction['preferred_language'] = preferred_language
2021-08-06 18:29:01 +02:00
transaction = aux_transaction_data(preferred_language, transaction)
2021-07-20 18:18:27 +02:00
session = SessionBase.create_session()
2021-08-25 12:33:35 +02:00
role = transaction.get('role')
alt_blockchain_address = transaction.get('alt_blockchain_address')
blockchain_address = transaction.get('blockchain_address')
identifier = bytes.fromhex(blockchain_address)
token_symbol = transaction.get('token_symbol')
if role == 'recipient':
key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_LAST_RECEIVED)
cache_data(key, token_symbol)
if role == 'sender':
key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_LAST_SENT)
cache_data(key, token_symbol)
2021-08-25 12:33:35 +02:00
account = validate_transaction_account(blockchain_address, role, session)
alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first()
if alt_account:
transaction['alt_metadata_id'] = alt_account.standard_metadata_id()
2021-08-29 11:55:47 +02:00
else:
transaction['alt_metadata_id'] = 'GRASSROOTS ECONOMICS'
2021-08-25 12:33:35 +02:00
transaction['metadata_id'] = account.standard_metadata_id()
2021-08-06 18:29:01 +02:00
transaction['phone_number'] = account.phone_number
session.close()
return transaction