296 lines
11 KiB
Python
296 lines
11 KiB
Python
# standard imports
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
# third-party imports
|
|
import celery
|
|
|
|
# local imports
|
|
from cic_ussd.balance import compute_operational_balance, get_balances
|
|
from cic_ussd.chain import Chain
|
|
from cic_ussd.conversions import from_wei
|
|
from cic_ussd.db.models.base import SessionBase
|
|
from cic_ussd.db.models.account import Account
|
|
from cic_ussd.error import ActionDataNotFoundError
|
|
from cic_ussd.redis import InMemoryStore, cache_data, create_cached_data_key, get_cached_data
|
|
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
|
|
|
|
logg = logging.getLogger(__file__)
|
|
celery_app = celery.current_app
|
|
|
|
|
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
|
def process_account_creation_callback(self, result: str, url: str, status_code: int):
|
|
"""This function defines a task that creates a user and
|
|
:param result: The blockchain address for the created account
|
|
:type result: str
|
|
:param url: URL provided to callback task in cic-eth should http be used for callback.
|
|
:type url: str
|
|
:param status_code: The status of the task to create an account
|
|
:type status_code: int
|
|
"""
|
|
session = SessionBase.create_session()
|
|
cache = InMemoryStore.cache
|
|
task_id = self.request.root_id
|
|
|
|
# get account creation status
|
|
account_creation_data = cache.get(task_id)
|
|
|
|
# check status
|
|
if account_creation_data:
|
|
account_creation_data = json.loads(account_creation_data)
|
|
if status_code == 0:
|
|
# update redis data
|
|
account_creation_data['status'] = 'CREATED'
|
|
cache.set(name=task_id, value=json.dumps(account_creation_data))
|
|
cache.persist(task_id)
|
|
|
|
phone_number = account_creation_data.get('phone_number')
|
|
|
|
# create user
|
|
user = Account(blockchain_address=result, phone_number=phone_number)
|
|
session.add(user)
|
|
session.commit()
|
|
session.close()
|
|
|
|
queue = self.request.delivery_info.get('routing_key')
|
|
|
|
# add phone number metadata lookup
|
|
s_phone_pointer = celery.signature(
|
|
'cic_ussd.tasks.metadata.add_phone_pointer',
|
|
[result, phone_number]
|
|
)
|
|
s_phone_pointer.apply_async(queue=queue)
|
|
|
|
# add custom metadata tags
|
|
custom_metadata = {
|
|
"tags": ["ussd", "individual"]
|
|
}
|
|
s_custom_metadata = celery.signature(
|
|
'cic_ussd.tasks.metadata.add_custom_metadata',
|
|
[result, custom_metadata]
|
|
)
|
|
s_custom_metadata.apply_async(queue=queue)
|
|
|
|
# expire cache
|
|
cache.expire(task_id, timedelta(seconds=180))
|
|
|
|
else:
|
|
session.close()
|
|
cache.expire(task_id, timedelta(seconds=180))
|
|
|
|
else:
|
|
session.close()
|
|
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
|
|
|
|
session.close()
|
|
|
|
|
|
@celery_app.task(bind=True)
|
|
def process_transaction_callback(self, result: dict, param: str, status_code: int):
|
|
if status_code == 0:
|
|
chain_str = Chain.spec.__str__()
|
|
|
|
# collect transaction metadata
|
|
destination_token_symbol = result.get('destination_token_symbol')
|
|
destination_token_value = result.get('destination_token_value')
|
|
recipient_blockchain_address = result.get('recipient')
|
|
sender_blockchain_address = result.get('sender')
|
|
source_token_symbol = result.get('source_token_symbol')
|
|
source_token_value = result.get('source_token_value')
|
|
|
|
# build stakeholder callback params
|
|
recipient_metadata = {
|
|
"token_symbol": destination_token_symbol,
|
|
"token_value": destination_token_value,
|
|
"blockchain_address": recipient_blockchain_address,
|
|
"tag": "recipient",
|
|
"tx_param": param
|
|
}
|
|
|
|
# retrieve account balances
|
|
get_balances(
|
|
address=recipient_blockchain_address,
|
|
callback_param=recipient_metadata,
|
|
chain_str=chain_str,
|
|
callback_task='cic_ussd.tasks.callback_handler.process_transaction_balances_callback',
|
|
token_symbol=destination_token_symbol,
|
|
asynchronous=True)
|
|
|
|
# only retrieve sender if transaction is a transfer
|
|
if param == 'transfer':
|
|
sender_metadata = {
|
|
"blockchain_address": sender_blockchain_address,
|
|
"token_symbol": source_token_symbol,
|
|
"token_value": source_token_value,
|
|
"tag": "sender",
|
|
"tx_param": param
|
|
}
|
|
|
|
get_balances(
|
|
address=sender_blockchain_address,
|
|
callback_param=sender_metadata,
|
|
chain_str=chain_str,
|
|
callback_task='cic_ussd.tasks.callback_handler.process_transaction_balances_callback',
|
|
token_symbol=source_token_symbol,
|
|
asynchronous=True)
|
|
else:
|
|
raise ValueError(f'Unexpected status code: {status_code}.')
|
|
|
|
|
|
@celery_app.task(bind=True)
|
|
def process_transaction_balances_callback(self, result: list, param: dict, status_code: int):
|
|
queue = self.request.delivery_info.get('routing_key')
|
|
if status_code == 0:
|
|
# retrieve balance data
|
|
balances_data = result[0]
|
|
operational_balance = compute_operational_balance(balances=balances_data)
|
|
|
|
# retrieve account's address
|
|
blockchain_address = param.get('blockchain_address')
|
|
|
|
# append balance to transaction metadata
|
|
transaction_metadata = param
|
|
transaction_metadata['operational_balance'] = operational_balance
|
|
|
|
# retrieve account's preferences
|
|
s_preferences_metadata = celery.signature(
|
|
'cic_ussd.tasks.metadata.query_preferences_metadata',
|
|
[blockchain_address],
|
|
queue=queue
|
|
)
|
|
|
|
# parse metadata and run validations
|
|
s_process_account_metadata = celery.signature(
|
|
'cic_ussd.tasks.processor.process_tx_metadata_for_notification',
|
|
[transaction_metadata],
|
|
queue=queue
|
|
)
|
|
|
|
# issue notification of transaction
|
|
s_notify_account = celery.signature(
|
|
'cic_ussd.tasks.notifications.notify_account_of_transaction',
|
|
queue=queue
|
|
)
|
|
|
|
if param.get('tx_param') == 'transfer':
|
|
celery.chain(s_preferences_metadata, s_process_account_metadata, s_notify_account).apply_async()
|
|
|
|
if param.get('tx_param') == 'tokengift':
|
|
s_process_account_metadata = celery.signature(
|
|
'cic_ussd.tasks.processor.process_tx_metadata_for_notification',
|
|
[{}, transaction_metadata],
|
|
queue=queue
|
|
)
|
|
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
|
|
else:
|
|
raise ValueError(f'Unexpected status code: {status_code}.')
|
|
|
|
|
|
@celery_app.task
|
|
def process_balances_callback(result: list, param: str, status_code: int):
|
|
if status_code == 0:
|
|
balances_data = result[0]
|
|
blockchain_address = balances_data.get('address')
|
|
key = create_cached_data_key(
|
|
identifier=bytes.fromhex(blockchain_address[2:]),
|
|
salt=':cic.balances_data'
|
|
)
|
|
cache_data(key=key, data=json.dumps(balances_data))
|
|
logg.debug(f'caching: {balances_data} with key: {key}')
|
|
else:
|
|
raise ValueError(f'Unexpected status code: {status_code}.')
|
|
|
|
|
|
# TODO: clean up this handler
|
|
def define_transaction_action_tag(
|
|
preferred_language: str,
|
|
sender_blockchain_address: str,
|
|
param: str):
|
|
# check if out going ot incoming transaction
|
|
if sender_blockchain_address == param:
|
|
# check preferred language
|
|
if preferred_language == 'en':
|
|
action_tag = 'SENT'
|
|
direction = 'TO'
|
|
else:
|
|
action_tag = 'ULITUMA'
|
|
direction = 'KWA'
|
|
else:
|
|
if preferred_language == 'en':
|
|
action_tag = 'RECEIVED'
|
|
direction = 'FROM'
|
|
else:
|
|
action_tag = 'ULIPOKEA'
|
|
direction = 'KUTOKA'
|
|
return action_tag, direction
|
|
|
|
|
|
@celery_app.task
|
|
def process_statement_callback(result, param: str, status_code: int):
|
|
if status_code == 0:
|
|
# create session
|
|
processed_transactions = []
|
|
|
|
# process transaction data to cache
|
|
for transaction in result:
|
|
sender_blockchain_address = transaction.get('sender')
|
|
recipient_address = transaction.get('recipient')
|
|
source_token = transaction.get('source_token')
|
|
|
|
# filter out any transactions that are "gassy"
|
|
if '0x0000000000000000000000000000000000000000' in source_token:
|
|
pass
|
|
else:
|
|
session = SessionBase.create_session()
|
|
# describe a processed transaction
|
|
processed_transaction = {}
|
|
|
|
# check if sender is in the system
|
|
sender: Account = session.query(Account).filter_by(blockchain_address=sender_blockchain_address).first()
|
|
owner: Account = session.query(Account).filter_by(blockchain_address=param).first()
|
|
if sender:
|
|
processed_transaction['sender_phone_number'] = sender.phone_number
|
|
|
|
action_tag, direction = define_transaction_action_tag(
|
|
preferred_language=owner.preferred_language,
|
|
sender_blockchain_address=sender_blockchain_address,
|
|
param=param
|
|
)
|
|
processed_transaction['action_tag'] = action_tag
|
|
processed_transaction['direction'] = direction
|
|
|
|
else:
|
|
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
|
|
|
|
# check if recipient is in the system
|
|
recipient: Account = session.query(Account).filter_by(blockchain_address=recipient_address).first()
|
|
if recipient:
|
|
processed_transaction['recipient_phone_number'] = recipient.phone_number
|
|
|
|
else:
|
|
logg.warning(f'Tx with recipient not found in cic-ussd')
|
|
|
|
session.close()
|
|
|
|
# add transaction values
|
|
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
|
|
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()
|
|
|
|
raw_timestamp = transaction.get('timestamp')
|
|
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')
|
|
processed_transaction['timestamp'] = timestamp
|
|
|
|
processed_transactions.append(processed_transaction)
|
|
|
|
# cache account statement
|
|
identifier = bytes.fromhex(param[2:])
|
|
key = create_cached_data_key(identifier=identifier, salt=':cic.statement')
|
|
data = json.dumps(processed_transactions)
|
|
|
|
# cache statement data
|
|
cache_data(key=key, data=data)
|
|
else:
|
|
raise ValueError(f'Unexpected status code: {status_code}.')
|