cic-internal-integration/apps/cic-ussd/cic_ussd/tasks/metadata.py

93 lines
3.0 KiB
Python
Raw Permalink Normal View History

# standard imports
2021-10-15 13:21:41 +02:00
import json
import logging
2021-04-14 11:00:10 +02:00
# third-party imports
import celery
2021-10-15 13:21:41 +02:00
from cic_types.models.person import Person
# local imports
2021-08-06 18:29:01 +02:00
from cic_ussd.metadata import CustomMetadata, PersonMetadata, PhonePointerMetadata, PreferencesMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
celery_app = celery.current_app
2021-10-15 13:21:41 +02:00
logg = logging.getLogger(__file__)
@celery_app.task
2021-04-14 11:00:10 +02:00
def query_person_metadata(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-04-14 11:00:10 +02:00
person_metadata_client = PersonMetadata(identifier=identifier)
2021-10-15 13:21:41 +02:00
response = person_metadata_client.query()
data = response.json()
person = Person()
person_data = person.deserialize(person_data=data)
serialized_person_data = person_data.serialize()
data = json.dumps(serialized_person_data)
person_metadata_client.cache_metadata(data=data)
@celery_app.task
2021-04-14 11:00:10 +02:00
def create_person_metadata(blockchain_address: str, data: dict):
"""
:param blockchain_address:
:type blockchain_address:
:param data:
:type data:
:return:
:rtype:
"""
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-04-14 11:00:10 +02:00
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.create(data=data)
@celery_app.task
2021-06-23 10:54:34 +02:00
def edit_person_metadata(blockchain_address: str, data: dict):
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-04-14 11:00:10 +02:00
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.edit(data=data)
@celery_app.task(bind=True, base=CriticalMetadataTask)
2021-04-14 11:00:10 +02:00
def add_phone_pointer(self, blockchain_address: str, phone_number: str):
identifier = phone_number.encode('utf-8')
2021-10-07 17:12:35 +02:00
stripped_address = blockchain_address
2021-04-14 11:00:10 +02:00
phone_metadata_client = PhonePointerMetadata(identifier=identifier)
phone_metadata_client.create(data=stripped_address)
2021-06-23 10:54:34 +02:00
@celery_app.task()
def add_custom_metadata(blockchain_address: str, data: dict):
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-06-23 10:54:34 +02:00
custom_metadata_client = CustomMetadata(identifier=identifier)
custom_metadata_client.create(data=data)
@celery_app.task()
def add_preferences_metadata(blockchain_address: str, data: dict):
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-08-06 18:29:01 +02:00
preferences_metadata_client = PreferencesMetadata(identifier=identifier)
preferences_metadata_client.create(data=data)
2021-07-20 18:18:27 +02:00
@celery_app.task()
def query_preferences_metadata(blockchain_address: str):
"""This method retrieves preferences metadata based on an account's blockchain address.
:param blockchain_address: Blockchain address of an account.
:type blockchain_address: str | Ox-hex
"""
2021-10-07 17:12:35 +02:00
identifier = bytes.fromhex(blockchain_address)
2021-10-15 13:21:41 +02:00
logg.debug(f'retrieving preferences metadata for address: {blockchain_address}.')
preferences_metadata_client = PreferencesMetadata(identifier=identifier)
response = preferences_metadata_client.query()
data = json.dumps(response.json())
preferences_metadata_client.cache_metadata(data)
return data