Merge branch 'philip/leaner-metadata-handling' into 'master'
Philip/leaner metadata handling See merge request grassrootseconomics/cic-internal-integration!94
This commit is contained in:
commit
bee602b16a
@ -1,7 +1,126 @@
|
|||||||
|
# standard imports
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Dict, Union
|
||||||
|
|
||||||
|
# third-part imports
|
||||||
|
import requests
|
||||||
|
from cic_types.models.person import generate_metadata_pointer, Person
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_ussd.metadata import make_request
|
||||||
|
from cic_ussd.metadata.signer import Signer
|
||||||
|
from cic_ussd.redis import cache_data
|
||||||
|
from cic_ussd.error import MetadataStoreError
|
||||||
|
|
||||||
|
|
||||||
|
logg = logging.getLogger().getChild(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Metadata:
|
class Metadata:
|
||||||
"""
|
"""
|
||||||
:cvar base_url:
|
:cvar base_url: The base url or the metadata server.
|
||||||
:type base_url:
|
:type base_url: str
|
||||||
"""
|
"""
|
||||||
|
|
||||||
base_url = None
|
base_url = None
|
||||||
|
|
||||||
|
|
||||||
|
def metadata_http_error_handler(result: requests.Response):
|
||||||
|
""" This function handles and appropriately raises errors from http requests interacting with the metadata server.
|
||||||
|
:param result: The response object from a http request.
|
||||||
|
:type result: requests.Response
|
||||||
|
"""
|
||||||
|
status_code = result.status_code
|
||||||
|
|
||||||
|
if 100 <= status_code < 200:
|
||||||
|
raise MetadataStoreError(f'Informational errors: {status_code}, reason: {result.reason}')
|
||||||
|
|
||||||
|
elif 300 <= status_code < 400:
|
||||||
|
raise MetadataStoreError(f'Redirect Issues: {status_code}, reason: {result.reason}')
|
||||||
|
|
||||||
|
elif 400 <= status_code < 500:
|
||||||
|
raise MetadataStoreError(f'Client Error: {status_code}, reason: {result.reason}')
|
||||||
|
|
||||||
|
elif 500 <= status_code < 600:
|
||||||
|
raise MetadataStoreError(f'Server Error: {status_code}, reason: {result.reason}')
|
||||||
|
|
||||||
|
|
||||||
|
class MetadataRequestsHandler(Metadata):
|
||||||
|
|
||||||
|
def __init__(self, cic_type: str, identifier: bytes, engine: str = 'pgp'):
|
||||||
|
"""
|
||||||
|
:param cic_type: The salt value with which to hash a specific metadata identifier.
|
||||||
|
:type cic_type: str
|
||||||
|
:param engine: Encryption used for sending data to the metadata server.
|
||||||
|
:type engine: str
|
||||||
|
:param identifier: A unique element of data in bytes necessary for creating a metadata pointer.
|
||||||
|
:type identifier: bytes
|
||||||
|
"""
|
||||||
|
self.cic_type = cic_type
|
||||||
|
self.engine = engine
|
||||||
|
self.headers = {
|
||||||
|
'X-CIC-AUTOMERGE': 'server',
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
self.identifier = identifier
|
||||||
|
self.metadata_pointer = generate_metadata_pointer(
|
||||||
|
identifier=self.identifier,
|
||||||
|
cic_type=self.cic_type
|
||||||
|
)
|
||||||
|
if self.base_url:
|
||||||
|
self.url = os.path.join(self.base_url, self.metadata_pointer)
|
||||||
|
|
||||||
|
def create(self, data: Union[Dict, str]):
|
||||||
|
""" This function is responsible for posting data to the metadata server with a corresponding metadata pointer
|
||||||
|
for storage.
|
||||||
|
:param data: The data to be stored in the metadata server.
|
||||||
|
:type data: dict|str
|
||||||
|
"""
|
||||||
|
data = json.dumps(data).encode('utf-8')
|
||||||
|
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
|
||||||
|
metadata_http_error_handler(result=result)
|
||||||
|
metadata = result.content
|
||||||
|
self.edit(data=metadata)
|
||||||
|
|
||||||
|
def edit(self, data: bytes):
|
||||||
|
""" This function is responsible for editing data in the metadata server corresponding to a unique pointer.
|
||||||
|
:param data: The data to be edited in the metadata server.
|
||||||
|
:type data: bytes
|
||||||
|
"""
|
||||||
|
cic_meta_signer = Signer()
|
||||||
|
signature = cic_meta_signer.sign_digest(data=data)
|
||||||
|
algorithm = cic_meta_signer.get_operational_key().get('algo')
|
||||||
|
decoded_data = data.decode('utf-8')
|
||||||
|
formatted_data = {
|
||||||
|
'm': data.decode('utf-8'),
|
||||||
|
's': {
|
||||||
|
'engine': self.engine,
|
||||||
|
'algo': algorithm,
|
||||||
|
'data': signature,
|
||||||
|
'digest': json.loads(data).get('digest'),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
formatted_data = json.dumps(formatted_data).encode('utf-8')
|
||||||
|
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
|
||||||
|
logg.info(f'signed metadata submission status: {result.status_code}.')
|
||||||
|
metadata_http_error_handler(result=result)
|
||||||
|
try:
|
||||||
|
decoded_identifier = self.identifier.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
decoded_identifier = self.identifier.hex()
|
||||||
|
logg.info(f'identifier: {decoded_identifier}. metadata pointer: {self.metadata_pointer} set to: {decoded_data}.')
|
||||||
|
|
||||||
|
def query(self):
|
||||||
|
"""This function is responsible for querying the metadata server for data corresponding to a unique pointer."""
|
||||||
|
result = make_request(method='GET', url=self.url)
|
||||||
|
metadata_http_error_handler(result=result)
|
||||||
|
response_data = result.content
|
||||||
|
data = json.loads(response_data.decode('utf-8'))
|
||||||
|
if result.status_code == 200 and self.cic_type == 'cic.person':
|
||||||
|
person = Person()
|
||||||
|
deserialized_person = person.deserialize(person_data=json.loads(data))
|
||||||
|
data = json.dumps(deserialized_person.serialize())
|
||||||
|
cache_data(self.metadata_pointer, data=data)
|
||||||
|
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')
|
||||||
|
12
apps/cic-ussd/cic_ussd/metadata/person.py
Normal file
12
apps/cic-ussd/cic_ussd/metadata/person.py
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
# standard imports
|
||||||
|
|
||||||
|
# third-party imports
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from .base import MetadataRequestsHandler
|
||||||
|
|
||||||
|
|
||||||
|
class PersonMetadata(MetadataRequestsHandler):
|
||||||
|
|
||||||
|
def __init__(self, identifier: bytes):
|
||||||
|
super().__init__(cic_type='cic.person', identifier=identifier)
|
@ -1,85 +1,13 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
|
|
||||||
# external imports
|
# external imports
|
||||||
import requests
|
|
||||||
from cic_types.models.person import generate_metadata_pointer
|
|
||||||
from cic_ussd.metadata import make_request
|
|
||||||
from cic_ussd.metadata.signer import Signer
|
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_ussd.error import MetadataStoreError
|
from .base import MetadataRequestsHandler
|
||||||
from .base import Metadata
|
|
||||||
|
|
||||||
logg = logging.getLogger().getChild(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class PhonePointerMetadata(Metadata):
|
class PhonePointerMetadata(MetadataRequestsHandler):
|
||||||
|
|
||||||
def __init__(self, identifier: bytes, engine: str):
|
|
||||||
"""
|
|
||||||
:param identifier:
|
|
||||||
:type identifier:
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.headers = {
|
|
||||||
'X-CIC-AUTOMERGE': 'server',
|
|
||||||
'Content-Type': 'application/json'
|
|
||||||
}
|
|
||||||
self.identifier = identifier
|
|
||||||
self.metadata_pointer = generate_metadata_pointer(
|
|
||||||
identifier=self.identifier,
|
|
||||||
cic_type=':cic.phone'
|
|
||||||
)
|
|
||||||
if self.base_url:
|
|
||||||
self.url = os.path.join(self.base_url, self.metadata_pointer)
|
|
||||||
|
|
||||||
self.engine = engine
|
|
||||||
|
|
||||||
|
|
||||||
def create(self, data: str):
|
|
||||||
try:
|
|
||||||
data = json.dumps(data).encode('utf-8')
|
|
||||||
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
|
|
||||||
metadata = result.content
|
|
||||||
logg.debug('data {} meta {} resp {} stats {}'.format(data, metadata, result.reason, result.status_code))
|
|
||||||
self.edit(data=metadata, engine=self.engine)
|
|
||||||
result.raise_for_status()
|
|
||||||
except requests.exceptions.HTTPError as error:
|
|
||||||
raise MetadataStoreError(error)
|
|
||||||
|
|
||||||
|
|
||||||
def edit(self, data: bytes, engine: str):
|
|
||||||
"""
|
|
||||||
:param data:
|
|
||||||
:type data:
|
|
||||||
:param engine:
|
|
||||||
:type engine:
|
|
||||||
:return:
|
|
||||||
:rtype:
|
|
||||||
"""
|
|
||||||
cic_meta_signer = Signer()
|
|
||||||
signature = cic_meta_signer.sign_digest(data=data)
|
|
||||||
algorithm = cic_meta_signer.get_operational_key().get('algo')
|
|
||||||
decoded_data = data.decode('utf-8')
|
|
||||||
formatted_data = {
|
|
||||||
'm': decoded_data,
|
|
||||||
's': {
|
|
||||||
'engine': engine,
|
|
||||||
'algo': algorithm,
|
|
||||||
'data': signature,
|
|
||||||
'digest': json.loads(data).get('digest'),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
formatted_data = json.dumps(formatted_data).encode('utf-8')
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
|
|
||||||
logg.debug(f'signed phone pointer metadata submission status: {result.status_code}.')
|
|
||||||
result.raise_for_status()
|
|
||||||
logg.info('phone {} metadata pointer {} set to {}'.format(self.identifier.decode('utf-8'), self.metadata_pointer, decoded_data))
|
|
||||||
except requests.exceptions.HTTPError as error:
|
|
||||||
raise MetadataStoreError(error)
|
|
||||||
|
|
||||||
|
def __init__(self, identifier: bytes):
|
||||||
|
super().__init__(cic_type='cic.msisdn', identifier=identifier)
|
||||||
|
@ -1,100 +0,0 @@
|
|||||||
# standard imports
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
|
|
||||||
# third-party imports
|
|
||||||
import requests
|
|
||||||
from cic_types.models.person import generate_metadata_pointer, Person
|
|
||||||
|
|
||||||
# local imports
|
|
||||||
from cic_ussd.chain import Chain
|
|
||||||
from cic_ussd.metadata import make_request
|
|
||||||
from cic_ussd.metadata.signer import Signer
|
|
||||||
from cic_ussd.redis import cache_data
|
|
||||||
from cic_ussd.error import MetadataStoreError
|
|
||||||
from .base import Metadata
|
|
||||||
|
|
||||||
logg = logging.getLogger()
|
|
||||||
|
|
||||||
|
|
||||||
class UserMetadata(Metadata):
|
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, identifier: bytes):
|
|
||||||
"""
|
|
||||||
:param identifier:
|
|
||||||
:type identifier:
|
|
||||||
"""
|
|
||||||
self.headers = {
|
|
||||||
'X-CIC-AUTOMERGE': 'server',
|
|
||||||
'Content-Type': 'application/json'
|
|
||||||
}
|
|
||||||
self.identifier = identifier
|
|
||||||
self.metadata_pointer = generate_metadata_pointer(
|
|
||||||
identifier=self.identifier,
|
|
||||||
cic_type=':cic.person'
|
|
||||||
)
|
|
||||||
if self.base_url:
|
|
||||||
self.url = os.path.join(self.base_url, self.metadata_pointer)
|
|
||||||
|
|
||||||
def create(self, data: dict):
|
|
||||||
try:
|
|
||||||
data = json.dumps(data).encode('utf-8')
|
|
||||||
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
|
|
||||||
metadata = result.content
|
|
||||||
self.edit(data=metadata, engine='pgp')
|
|
||||||
logg.info(f'Get sign material response status: {result.status_code}')
|
|
||||||
result.raise_for_status()
|
|
||||||
except requests.exceptions.HTTPError as error:
|
|
||||||
raise MetadataStoreError(error)
|
|
||||||
|
|
||||||
def edit(self, data: bytes, engine: str):
|
|
||||||
"""
|
|
||||||
:param data:
|
|
||||||
:type data:
|
|
||||||
:param engine:
|
|
||||||
:type engine:
|
|
||||||
:return:
|
|
||||||
:rtype:
|
|
||||||
"""
|
|
||||||
cic_meta_signer = Signer()
|
|
||||||
signature = cic_meta_signer.sign_digest(data=data)
|
|
||||||
algorithm = cic_meta_signer.get_operational_key().get('algo')
|
|
||||||
formatted_data = {
|
|
||||||
'm': data.decode('utf-8'),
|
|
||||||
's': {
|
|
||||||
'engine': engine,
|
|
||||||
'algo': algorithm,
|
|
||||||
'data': signature,
|
|
||||||
'digest': json.loads(data).get('digest'),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
formatted_data = json.dumps(formatted_data).encode('utf-8')
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
|
|
||||||
logg.debug(f'signed user metadata submission status: {result.status_code}.')
|
|
||||||
result.raise_for_status()
|
|
||||||
except requests.exceptions.HTTPError as error:
|
|
||||||
raise MetadataStoreError(error)
|
|
||||||
|
|
||||||
def query(self):
|
|
||||||
result = make_request(method='GET', url=self.url)
|
|
||||||
status = result.status_code
|
|
||||||
logg.info(f'Get latest data status: {status}')
|
|
||||||
try:
|
|
||||||
if status == 200:
|
|
||||||
response_data = result.content
|
|
||||||
data = json.loads(response_data.decode())
|
|
||||||
|
|
||||||
# validate data
|
|
||||||
person = Person()
|
|
||||||
deserialized_person = person.deserialize(person_data=json.loads(data))
|
|
||||||
|
|
||||||
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
|
|
||||||
elif status == 404:
|
|
||||||
logg.info('The data is not available and might need to be added.')
|
|
||||||
result.raise_for_status()
|
|
||||||
except requests.exceptions.HTTPError as error:
|
|
||||||
raise MetadataNotFoundError(error)
|
|
@ -235,7 +235,7 @@ def process_display_user_metadata(user: User, display_key: str):
|
|||||||
products=products
|
products=products
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
|
raise MetadataNotFoundError(f'Expected person metadata but found none in cache for key: {key}')
|
||||||
|
|
||||||
|
|
||||||
def process_account_statement(user: User, display_key: str, ussd_session: dict):
|
def process_account_statement(user: User, display_key: str, ussd_session: dict):
|
||||||
@ -331,11 +331,11 @@ def process_start_menu(display_key: str, user: User):
|
|||||||
operational_balance = compute_operational_balance(balances=balances_data)
|
operational_balance = compute_operational_balance(balances=balances_data)
|
||||||
|
|
||||||
# retrieve and cache account's metadata
|
# retrieve and cache account's metadata
|
||||||
s_query_user_metadata = celery.signature(
|
s_query_person_metadata = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
'cic_ussd.tasks.metadata.query_person_metadata',
|
||||||
[blockchain_address]
|
[blockchain_address]
|
||||||
)
|
)
|
||||||
s_query_user_metadata.apply_async(queue='cic-ussd')
|
s_query_person_metadata.apply_async(queue='cic-ussd')
|
||||||
|
|
||||||
# retrieve and cache account's statement
|
# retrieve and cache account's statement
|
||||||
retrieve_account_statement(blockchain_address=blockchain_address)
|
retrieve_account_statement(blockchain_address=blockchain_address)
|
||||||
@ -389,14 +389,11 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
|
|||||||
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
|
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
|
||||||
cic_type='cic.person'
|
cic_type='cic.person'
|
||||||
)
|
)
|
||||||
logg.debug(f'METADATA POINTER: {key}')
|
person_metadata = get_cached_data(key=key)
|
||||||
user_metadata = get_cached_data(key=key)
|
|
||||||
logg.debug(f'METADATA: {user_metadata}')
|
|
||||||
|
|
||||||
if last_ussd_session:
|
if last_ussd_session:
|
||||||
# get last state
|
# get last state
|
||||||
last_state = last_ussd_session.state
|
last_state = last_ussd_session.state
|
||||||
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
|
|
||||||
# if last state is account_creation_prompt and metadata exists, show start menu
|
# if last state is account_creation_prompt and metadata exists, show start menu
|
||||||
if last_state in [
|
if last_state in [
|
||||||
'account_creation_prompt',
|
'account_creation_prompt',
|
||||||
@ -405,7 +402,7 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
|
|||||||
'exit_invalid_new_pin',
|
'exit_invalid_new_pin',
|
||||||
'exit_pin_mismatch',
|
'exit_pin_mismatch',
|
||||||
'exit_invalid_request'
|
'exit_invalid_request'
|
||||||
] and user_metadata is not None:
|
] and person_metadata is not None:
|
||||||
return UssdMenu.find_by_name(name='start')
|
return UssdMenu.find_by_name(name='start')
|
||||||
else:
|
else:
|
||||||
return UssdMenu.find_by_name(name=last_state)
|
return UssdMenu.find_by_name(name=last_state)
|
||||||
|
@ -97,11 +97,11 @@ def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
|
|||||||
recipient = get_user_by_phone_number(phone_number=user_input)
|
recipient = get_user_by_phone_number(phone_number=user_input)
|
||||||
blockchain_address = recipient.blockchain_address
|
blockchain_address = recipient.blockchain_address
|
||||||
# retrieve and cache account's metadata
|
# retrieve and cache account's metadata
|
||||||
s_query_user_metadata = celery.signature(
|
s_query_person_metadata = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
'cic_ussd.tasks.metadata.query_person_metadata',
|
||||||
[blockchain_address]
|
[blockchain_address]
|
||||||
)
|
)
|
||||||
s_query_user_metadata.apply_async(queue='cic-ussd')
|
s_query_person_metadata.apply_async(queue='cic-ussd')
|
||||||
|
|
||||||
|
|
||||||
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
||||||
|
@ -164,11 +164,11 @@ def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
|||||||
user_metadata = format_user_metadata(metadata=metadata, user=user)
|
user_metadata = format_user_metadata(metadata=metadata, user=user)
|
||||||
|
|
||||||
blockchain_address = user.blockchain_address
|
blockchain_address = user.blockchain_address
|
||||||
s_create_user_metadata = celery.signature(
|
s_create_person_metadata = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.create_user_metadata',
|
'cic_ussd.tasks.metadata.create_person_metadata',
|
||||||
[blockchain_address, user_metadata]
|
[blockchain_address, user_metadata]
|
||||||
)
|
)
|
||||||
s_create_user_metadata.apply_async(queue='cic-ussd')
|
s_create_person_metadata.apply_async(queue='cic-ussd')
|
||||||
|
|
||||||
|
|
||||||
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
|
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
|
||||||
@ -211,18 +211,18 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
|
|||||||
|
|
||||||
edited_metadata = deserialized_person.serialize()
|
edited_metadata = deserialized_person.serialize()
|
||||||
|
|
||||||
s_edit_user_metadata = celery.signature(
|
s_edit_person_metadata = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.edit_user_metadata',
|
'cic_ussd.tasks.metadata.edit_person_metadata',
|
||||||
[blockchain_address, edited_metadata, 'pgp']
|
[blockchain_address, edited_metadata]
|
||||||
)
|
)
|
||||||
s_edit_user_metadata.apply_async(queue='cic-ussd')
|
s_edit_person_metadata.apply_async(queue='cic-ussd')
|
||||||
|
|
||||||
|
|
||||||
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
||||||
user_input, ussd_session, user = state_machine_data
|
user_input, ussd_session, user = state_machine_data
|
||||||
blockchain_address = user.blockchain_address
|
blockchain_address = user.blockchain_address
|
||||||
s_get_user_metadata = celery.signature(
|
s_get_user_metadata = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
'cic_ussd.tasks.metadata.query_person_metadata',
|
||||||
[blockchain_address]
|
[blockchain_address]
|
||||||
)
|
)
|
||||||
s_get_user_metadata.apply_async(queue='cic-ussd')
|
s_get_user_metadata.apply_async(queue='cic-ussd')
|
||||||
|
@ -57,14 +57,9 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
|
|||||||
queue = self.request.delivery_info.get('routing_key')
|
queue = self.request.delivery_info.get('routing_key')
|
||||||
s = celery.signature(
|
s = celery.signature(
|
||||||
'cic_ussd.tasks.metadata.add_phone_pointer',
|
'cic_ussd.tasks.metadata.add_phone_pointer',
|
||||||
[
|
[result, phone_number]
|
||||||
result,
|
|
||||||
phone_number,
|
|
||||||
'pgp',
|
|
||||||
],
|
|
||||||
queue=queue,
|
|
||||||
)
|
)
|
||||||
s.apply_async()
|
s.apply_async(queue=queue)
|
||||||
|
|
||||||
# expire cache
|
# expire cache
|
||||||
cache.expire(task_id, timedelta(seconds=180))
|
cache.expire(task_id, timedelta(seconds=180))
|
||||||
|
@ -1,14 +1,13 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
# external imports
|
# third-party imports
|
||||||
import celery
|
import celery
|
||||||
from hexathon import strip_0x
|
from hexathon import strip_0x
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||||
from cic_ussd.metadata.user import UserMetadata
|
from cic_ussd.metadata.person import PersonMetadata
|
||||||
from cic_ussd.metadata.phone import PhonePointerMetadata
|
from cic_ussd.metadata.phone import PhonePointerMetadata
|
||||||
from cic_ussd.tasks.base import CriticalMetadataTask
|
from cic_ussd.tasks.base import CriticalMetadataTask
|
||||||
|
|
||||||
@ -17,7 +16,7 @@ logg = logging.getLogger().getChild(__name__)
|
|||||||
|
|
||||||
|
|
||||||
@celery_app.task
|
@celery_app.task
|
||||||
def query_user_metadata(blockchain_address: str):
|
def query_person_metadata(blockchain_address: str):
|
||||||
"""
|
"""
|
||||||
:param blockchain_address:
|
:param blockchain_address:
|
||||||
:type blockchain_address:
|
:type blockchain_address:
|
||||||
@ -25,12 +24,12 @@ def query_user_metadata(blockchain_address: str):
|
|||||||
:rtype:
|
:rtype:
|
||||||
"""
|
"""
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
user_metadata_client.query()
|
person_metadata_client.query()
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task
|
@celery_app.task
|
||||||
def create_user_metadata(blockchain_address: str, data: dict):
|
def create_person_metadata(blockchain_address: str, data: dict):
|
||||||
"""
|
"""
|
||||||
:param blockchain_address:
|
:param blockchain_address:
|
||||||
:type blockchain_address:
|
:type blockchain_address:
|
||||||
@ -40,19 +39,20 @@ def create_user_metadata(blockchain_address: str, data: dict):
|
|||||||
:rtype:
|
:rtype:
|
||||||
"""
|
"""
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
user_metadata_client.create(data=data)
|
person_metadata_client.create(data=data)
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task
|
@celery_app.task
|
||||||
def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
|
def edit_person_metadata(blockchain_address: str, data: bytes):
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
user_metadata_client.edit(data=data, engine=engine)
|
person_metadata_client.edit(data=data)
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, base=CriticalMetadataTask)
|
@celery_app.task(bind=True, base=CriticalMetadataTask)
|
||||||
def add_phone_pointer(self, blockchain_address: str, phone: str, engine: str):
|
def add_phone_pointer(self, blockchain_address: str, phone_number: str):
|
||||||
|
identifier = phone_number.encode('utf-8')
|
||||||
stripped_address = strip_0x(blockchain_address)
|
stripped_address = strip_0x(blockchain_address)
|
||||||
phone_metadata_client = PhonePointerMetadata(identifier=phone.encode('utf-8'), engine=engine)
|
phone_metadata_client = PhonePointerMetadata(identifier=identifier)
|
||||||
phone_metadata_client.create(data=stripped_address)
|
phone_metadata_client.create(data=stripped_address)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
cic_base[full_graph]~=0.1.2a67
|
cic_base[full_graph]~=0.1.2a68
|
||||||
cic-eth~=0.11.0b3
|
cic-eth~=0.11.0b3
|
||||||
cic-notify~=0.4.0a3
|
cic-notify~=0.4.0a3
|
||||||
cic-types~=0.1.0a10
|
cic-types~=0.1.0a10
|
||||||
|
@ -9,26 +9,26 @@ from cic_types.models.person import generate_metadata_pointer
|
|||||||
# local imports
|
# local imports
|
||||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||||
from cic_ussd.metadata.signer import Signer
|
from cic_ussd.metadata.signer import Signer
|
||||||
from cic_ussd.metadata.user import UserMetadata
|
from cic_ussd.metadata.person import PersonMetadata
|
||||||
from cic_ussd.redis import get_cached_data
|
from cic_ussd.redis import get_cached_data
|
||||||
|
|
||||||
|
|
||||||
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
|
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
|
||||||
UserMetadata.base_url = load_config.get('CIC_META_URL')
|
PersonMetadata.base_url = load_config.get('CIC_META_URL')
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
|
|
||||||
assert user_metadata_client.url == define_metadata_pointer_url
|
assert person_metadata_client.url == define_metadata_pointer_url
|
||||||
|
|
||||||
|
|
||||||
def test_create_user_metadata(caplog,
|
def test_create_person_metadata(caplog,
|
||||||
create_activated_user,
|
create_activated_user,
|
||||||
define_metadata_pointer_url,
|
define_metadata_pointer_url,
|
||||||
load_config,
|
load_config,
|
||||||
mock_meta_post_response,
|
mock_meta_post_response,
|
||||||
person_metadata):
|
person_metadata):
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
|
|
||||||
with requests_mock.Mocker(real_http=False) as request_mocker:
|
with requests_mock.Mocker(real_http=False) as request_mocker:
|
||||||
request_mocker.register_uri(
|
request_mocker.register_uri(
|
||||||
@ -38,7 +38,7 @@ def test_create_user_metadata(caplog,
|
|||||||
reason='CREATED',
|
reason='CREATED',
|
||||||
content=json.dumps(mock_meta_post_response).encode('utf-8')
|
content=json.dumps(mock_meta_post_response).encode('utf-8')
|
||||||
)
|
)
|
||||||
user_metadata_client.create(data=person_metadata)
|
person_metadata_client.create(data=person_metadata)
|
||||||
assert 'Get signed material response status: 201' in caplog.text
|
assert 'Get signed material response status: 201' in caplog.text
|
||||||
|
|
||||||
with pytest.raises(RuntimeError) as error:
|
with pytest.raises(RuntimeError) as error:
|
||||||
@ -49,11 +49,11 @@ def test_create_user_metadata(caplog,
|
|||||||
status_code=400,
|
status_code=400,
|
||||||
reason='BAD REQUEST'
|
reason='BAD REQUEST'
|
||||||
)
|
)
|
||||||
user_metadata_client.create(data=person_metadata)
|
person_metadata_client.create(data=person_metadata)
|
||||||
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
|
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
|
||||||
|
|
||||||
|
|
||||||
def test_edit_user_metadata(caplog,
|
def test_edit_person_metadata(caplog,
|
||||||
create_activated_user,
|
create_activated_user,
|
||||||
define_metadata_pointer_url,
|
define_metadata_pointer_url,
|
||||||
load_config,
|
load_config,
|
||||||
@ -61,7 +61,7 @@ def test_edit_user_metadata(caplog,
|
|||||||
setup_metadata_signer):
|
setup_metadata_signer):
|
||||||
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
|
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
with requests_mock.Mocker(real_http=False) as request_mocker:
|
with requests_mock.Mocker(real_http=False) as request_mocker:
|
||||||
request_mocker.register_uri(
|
request_mocker.register_uri(
|
||||||
'PUT',
|
'PUT',
|
||||||
@ -69,7 +69,7 @@ def test_edit_user_metadata(caplog,
|
|||||||
status_code=200,
|
status_code=200,
|
||||||
reason='OK'
|
reason='OK'
|
||||||
)
|
)
|
||||||
user_metadata_client.edit(data=person_metadata, engine='pgp')
|
person_metadata_client.edit(data=person_metadata)
|
||||||
assert 'Signed content submission status: 200' in caplog.text
|
assert 'Signed content submission status: 200' in caplog.text
|
||||||
|
|
||||||
with pytest.raises(RuntimeError) as error:
|
with pytest.raises(RuntimeError) as error:
|
||||||
@ -80,7 +80,7 @@ def test_edit_user_metadata(caplog,
|
|||||||
status_code=400,
|
status_code=400,
|
||||||
reason='BAD REQUEST'
|
reason='BAD REQUEST'
|
||||||
)
|
)
|
||||||
user_metadata_client.edit(data=person_metadata, engine='pgp')
|
person_metadata_client.edit(data=person_metadata)
|
||||||
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
|
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
|
||||||
|
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ def test_get_user_metadata(caplog,
|
|||||||
person_metadata,
|
person_metadata,
|
||||||
setup_metadata_signer):
|
setup_metadata_signer):
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
with requests_mock.Mocker(real_http=False) as request_mocker:
|
with requests_mock.Mocker(real_http=False) as request_mocker:
|
||||||
request_mocker.register_uri(
|
request_mocker.register_uri(
|
||||||
'GET',
|
'GET',
|
||||||
@ -101,7 +101,7 @@ def test_get_user_metadata(caplog,
|
|||||||
content=json.dumps(person_metadata).encode('utf-8'),
|
content=json.dumps(person_metadata).encode('utf-8'),
|
||||||
reason='OK'
|
reason='OK'
|
||||||
)
|
)
|
||||||
user_metadata_client.query()
|
person_metadata_client.query()
|
||||||
assert 'Get latest data status: 200' in caplog.text
|
assert 'Get latest data status: 200' in caplog.text
|
||||||
key = generate_metadata_pointer(
|
key = generate_metadata_pointer(
|
||||||
identifier=identifier,
|
identifier=identifier,
|
||||||
@ -118,6 +118,6 @@ def test_get_user_metadata(caplog,
|
|||||||
status_code=404,
|
status_code=404,
|
||||||
reason='NOT FOUND'
|
reason='NOT FOUND'
|
||||||
)
|
)
|
||||||
user_metadata_client.query()
|
person_metadata_client.query()
|
||||||
assert 'The data is not available and might need to be added.' in caplog.text
|
assert 'The data is not available and might need to be added.' in caplog.text
|
||||||
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'
|
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'
|
||||||
|
@ -15,7 +15,7 @@ from cic_ussd.state_machine.logic.user import (
|
|||||||
get_user_metadata,
|
get_user_metadata,
|
||||||
save_complete_user_metadata,
|
save_complete_user_metadata,
|
||||||
process_gender_user_input,
|
process_gender_user_input,
|
||||||
save_profile_attribute_to_session_data,
|
save_metadata_attribute_to_session_data,
|
||||||
update_account_status_to_active)
|
update_account_status_to_active)
|
||||||
|
|
||||||
|
|
||||||
@ -41,7 +41,7 @@ def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_
|
|||||||
("enter_location", "location", "Kangemi", "Kangemi"),
|
("enter_location", "location", "Kangemi", "Kangemi"),
|
||||||
("enter_products", "products", "Mandazi", "Mandazi"),
|
("enter_products", "products", "Mandazi", "Mandazi"),
|
||||||
])
|
])
|
||||||
def test_save_save_profile_attribute_to_session_data(current_state,
|
def test_save_metadata_attribute_to_session_data(current_state,
|
||||||
expected_key,
|
expected_key,
|
||||||
expected_result,
|
expected_result,
|
||||||
user_input,
|
user_input,
|
||||||
@ -56,7 +56,7 @@ def test_save_save_profile_attribute_to_session_data(current_state,
|
|||||||
in_memory_ussd_session = json.loads(in_memory_ussd_session)
|
in_memory_ussd_session = json.loads(in_memory_ussd_session)
|
||||||
assert in_memory_ussd_session.get('session_data') == {}
|
assert in_memory_ussd_session.get('session_data') == {}
|
||||||
serialized_in_db_ussd_session['state'] = current_state
|
serialized_in_db_ussd_session['state'] = current_state
|
||||||
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
|
save_metadata_attribute_to_session_data(state_machine_data=state_machine_data)
|
||||||
|
|
||||||
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
|
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
|
||||||
in_memory_ussd_session = json.loads(in_memory_ussd_session)
|
in_memory_ussd_session = json.loads(in_memory_ussd_session)
|
||||||
@ -98,7 +98,7 @@ def test_save_complete_user_metadata(celery_session_worker,
|
|||||||
ussd_session['session_data'] = complete_user_metadata
|
ussd_session['session_data'] = complete_user_metadata
|
||||||
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
|
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
|
||||||
state_machine_data = ('', ussd_session, create_activated_user)
|
state_machine_data = ('', ussd_session, create_activated_user)
|
||||||
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_user_metadata.apply_async')
|
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
|
||||||
save_complete_user_metadata(state_machine_data=state_machine_data)
|
save_complete_user_metadata(state_machine_data=state_machine_data)
|
||||||
mocked_create_metadata_task.assert_called_with(
|
mocked_create_metadata_task.assert_called_with(
|
||||||
(user_metadata, create_activated_user.blockchain_address),
|
(user_metadata, create_activated_user.blockchain_address),
|
||||||
@ -127,7 +127,7 @@ def test_edit_user_metadata_attribute(celery_session_worker,
|
|||||||
}
|
}
|
||||||
state_machine_data = ('', ussd_session, create_activated_user)
|
state_machine_data = ('', ussd_session, create_activated_user)
|
||||||
|
|
||||||
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_user_metadata.apply_async')
|
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_person_metadata.apply_async')
|
||||||
edit_user_metadata_attribute(state_machine_data=state_machine_data)
|
edit_user_metadata_attribute(state_machine_data=state_machine_data)
|
||||||
person_metadata['location']['area_name'] = 'nairobi'
|
person_metadata['location']['area_name'] = 'nairobi'
|
||||||
mocked_edit_metadata.assert_called_with(
|
mocked_edit_metadata.assert_called_with(
|
||||||
@ -146,7 +146,7 @@ def test_get_user_metadata_attribute(celery_session_worker,
|
|||||||
ussd_session = json.loads(ussd_session)
|
ussd_session = json.loads(ussd_session)
|
||||||
state_machine_data = ('', ussd_session, create_activated_user)
|
state_machine_data = ('', ussd_session, create_activated_user)
|
||||||
|
|
||||||
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_user_metadata.apply_async')
|
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
|
||||||
get_user_metadata(state_machine_data=state_machine_data)
|
get_user_metadata(state_machine_data=state_machine_data)
|
||||||
mocked_get_metadata.assert_called_with(
|
mocked_get_metadata.assert_called_with(
|
||||||
(create_activated_user.blockchain_address,),
|
(create_activated_user.blockchain_address,),
|
||||||
|
8
apps/cic-ussd/tests/fixtures/config.py
vendored
8
apps/cic-ussd/tests/fixtures/config.py
vendored
@ -18,7 +18,7 @@ from cic_ussd.files.local_files import create_local_file_data_stores, json_file_
|
|||||||
from cic_ussd.menu.ussd_menu import UssdMenu
|
from cic_ussd.menu.ussd_menu import UssdMenu
|
||||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||||
from cic_ussd.metadata.signer import Signer
|
from cic_ussd.metadata.signer import Signer
|
||||||
from cic_ussd.metadata.user import UserMetadata
|
from cic_ussd.metadata.person import PersonMetadata
|
||||||
from cic_ussd.state_machine import UssdStateMachine
|
from cic_ussd.state_machine import UssdStateMachine
|
||||||
|
|
||||||
|
|
||||||
@ -121,9 +121,9 @@ def setup_metadata_signer(load_config):
|
|||||||
@pytest.fixture(scope='function')
|
@pytest.fixture(scope='function')
|
||||||
def define_metadata_pointer_url(load_config, create_activated_user):
|
def define_metadata_pointer_url(load_config, create_activated_user):
|
||||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
|
||||||
UserMetadata.base_url = load_config.get('CIC_META_URL')
|
PersonMetadata.base_url = load_config.get('CIC_META_URL')
|
||||||
user_metadata_client = UserMetadata(identifier=identifier)
|
person_metadata_client = PersonMetadata(identifier=identifier)
|
||||||
return user_metadata_client.url
|
return person_metadata_client.url
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='function')
|
@pytest.fixture(scope='function')
|
||||||
|
Loading…
Reference in New Issue
Block a user