cic-stack/apps/cic-ussd/cic_ussd/tasks/callback_handler.py

248 lines
9.6 KiB
Python
Raw Normal View History

2021-02-06 16:13:47 +01:00
# standard imports
import json
import logging
from datetime import datetime, timedelta
2021-02-06 16:13:47 +01:00
# third-party imports
import celery
# local imports
from cic_ussd.conversions import from_wei
2021-02-06 16:13:47 +01:00
from cic_ussd.db.models.base import SessionBase
2021-04-19 10:44:40 +02:00
from cic_ussd.db.models.account import Account
from cic_ussd.account import define_account_tx_metadata
2021-02-06 16:13:47 +01:00
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.redis import InMemoryStore, cache_data, create_cached_data_key
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
2021-02-06 16:13:47 +01:00
from cic_ussd.transactions import IncomingTransactionProcessor
logg = logging.getLogger(__file__)
celery_app = celery.current_app
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
2021-02-06 16:13:47 +01:00
def process_account_creation_callback(self, result: str, url: str, status_code: int):
"""This function defines a task that creates a user and
:param result: The blockchain address for the created account
:type result: str
:param url: URL provided to callback task in cic-eth should http be used for callback.
:type url: str
:param status_code: The status of the task to create an account
:type status_code: int
"""
session = SessionBase.create_session()
cache = InMemoryStore.cache
task_id = self.request.root_id
# get account creation status
account_creation_data = cache.get(task_id)
# check status
if account_creation_data:
account_creation_data = json.loads(account_creation_data)
if status_code == 0:
# update redis data
account_creation_data['status'] = 'CREATED'
cache.set(name=task_id, value=json.dumps(account_creation_data))
cache.persist(task_id)
phone_number = account_creation_data.get('phone_number')
# create user
2021-04-19 10:44:40 +02:00
user = Account(blockchain_address=result, phone_number=phone_number)
2021-02-06 16:13:47 +01:00
session.add(user)
session.commit()
session.close()
2021-06-23 10:54:34 +02:00
queue = self.request.delivery_info.get('routing_key')
2021-06-23 10:54:34 +02:00
# add phone number metadata lookup
s_phone_pointer = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer',
2021-04-14 11:00:10 +02:00
[result, phone_number]
)
2021-06-23 10:54:34 +02:00
s_phone_pointer.apply_async(queue=queue)
# add custom metadata tags
custom_metadata = {
"tags": ["ussd", "individual"]
}
s_custom_metadata = celery.signature(
'cic_ussd.tasks.metadata.add_custom_metadata',
[result, custom_metadata]
)
s_custom_metadata.apply_async(queue=queue)
2021-02-06 16:13:47 +01:00
# expire cache
cache.expire(task_id, timedelta(seconds=180))
2021-02-06 16:13:47 +01:00
else:
session.close()
cache.expire(task_id, timedelta(seconds=180))
2021-02-06 16:13:47 +01:00
else:
session.close()
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
session.close()
2021-02-06 16:13:47 +01:00
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
session = SessionBase.create_session()
if status_code == 0:
2021-02-06 16:13:47 +01:00
# collect result data
recipient_blockchain_address = result.get('recipient')
sender_blockchain_address = result.get('sender')
2021-04-16 22:24:07 +02:00
token_symbol = result.get('destination_token_symbol')
value = result.get('destination_token_value')
2021-02-06 16:13:47 +01:00
# try to find users in system
2021-04-19 10:44:40 +02:00
recipient_user = session.query(Account).filter_by(blockchain_address=recipient_blockchain_address).first()
sender_user = session.query(Account).filter_by(blockchain_address=sender_blockchain_address).first()
2021-02-06 16:13:47 +01:00
# check whether recipient is in the system
if not recipient_user:
session.close()
raise ValueError(
f'Tx for recipient: {recipient_blockchain_address} was received but has no matching user in the system.'
)
# process incoming transactions
incoming_tx_processor = IncomingTransactionProcessor(phone_number=recipient_user.phone_number,
preferred_language=recipient_user.preferred_language,
token_symbol=token_symbol,
value=value)
if param == 'tokengift':
incoming_tx_processor.process_token_gift_incoming_transactions()
2021-02-06 16:13:47 +01:00
elif param == 'transfer':
if sender_user:
sender_information = define_account_tx_metadata(user=sender_user)
incoming_tx_processor.process_transfer_incoming_transaction(
sender_information=sender_information,
recipient_blockchain_address=recipient_blockchain_address
)
2021-02-06 16:13:47 +01:00
else:
logg.warning(
f'Tx with sender: {sender_blockchain_address} was received but has no matching user in the system.'
)
incoming_tx_processor.process_transfer_incoming_transaction(
sender_information='GRASSROOTS ECONOMICS',
recipient_blockchain_address=recipient_blockchain_address
)
2021-02-06 16:13:47 +01:00
else:
session.close()
raise ValueError(f'Unexpected transaction param: {param}.')
else:
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
session.close()
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
if status_code == 0:
balances_data = result[0]
blockchain_address = balances_data.get('address')
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
2021-05-01 16:52:54 +02:00
salt=':cic.balances_data'
)
cache_data(key=key, data=json.dumps(balances_data))
else:
raise ValueError(f'Unexpected status code: {status_code}.')
# TODO: clean up this handler
def define_transaction_action_tag(
preferred_language: str,
sender_blockchain_address: str,
param: str):
# check if out going ot incoming transaction
if sender_blockchain_address == param:
# check preferred language
if preferred_language == 'en':
action_tag = 'SENT'
2021-04-06 19:53:38 +02:00
direction = 'TO'
else:
action_tag = 'ULITUMA'
2021-04-06 19:53:38 +02:00
direction = 'KWA'
else:
if preferred_language == 'en':
action_tag = 'RECEIVED'
2021-04-06 19:53:38 +02:00
direction = 'FROM'
else:
action_tag = 'ULIPOKEA'
2021-04-06 19:53:38 +02:00
direction = 'KUTOKA'
return action_tag, direction
@celery_app.task
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
processed_transactions = []
# process transaction data to cache
for transaction in result:
sender_blockchain_address = transaction.get('sender')
recipient_address = transaction.get('recipient')
source_token = transaction.get('source_token')
# filter out any transactions that are "gassy"
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
session = SessionBase.create_session()
# describe a processed transaction
processed_transaction = {}
# check if sender is in the system
2021-04-19 10:44:40 +02:00
sender: Account = session.query(Account).filter_by(blockchain_address=sender_blockchain_address).first()
owner: Account = session.query(Account).filter_by(blockchain_address=param).first()
if sender:
processed_transaction['sender_phone_number'] = sender.phone_number
2021-04-06 19:53:38 +02:00
action_tag, direction = define_transaction_action_tag(
preferred_language=owner.preferred_language,
sender_blockchain_address=sender_blockchain_address,
param=param
)
processed_transaction['action_tag'] = action_tag
2021-04-06 19:53:38 +02:00
processed_transaction['direction'] = direction
else:
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
# check if recipient is in the system
2021-04-19 10:44:40 +02:00
recipient: Account = session.query(Account).filter_by(blockchain_address=recipient_address).first()
if recipient:
processed_transaction['recipient_phone_number'] = recipient.phone_number
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
session.close()
# add transaction values
2021-04-06 19:53:38 +02:00
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()
raw_timestamp = transaction.get('timestamp')
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')
processed_transaction['timestamp'] = timestamp
processed_transactions.append(processed_transaction)
# cache account statement
identifier = bytes.fromhex(param[2:])
2021-05-01 16:52:54 +02:00
key = create_cached_data_key(identifier=identifier, salt=':cic.statement')
data = json.dumps(processed_transactions)
# cache statement data
cache_data(key=key, data=data)
else:
raise ValueError(f'Unexpected status code: {status_code}.')