cic-internal-integration/apps/cic-ussd/cic_ussd/state_machine/logic/language.py

96 lines
3.1 KiB
Python
Raw Normal View History

2022-01-04 17:16:00 +01:00
# standard imports
import json
from typing import Tuple
# external imports
import celery
import i18n
from cic_types.condiments import MetadataPointer
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
2022-01-10 05:44:50 +01:00
from cic_ussd.processor.poller import wait_for_cache, wait_for_session_data
2022-01-04 17:16:00 +01:00
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.translation import Languages
def is_valid_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list = json.loads(cached_system_languages)
if not language_list:
wait_for_cache(identifier='system:languages'.encode('utf-8'), resource_name='Languages list', salt=MetadataPointer.NONE)
if user_input in ['00', '11', '22']:
return False
user_input = int(user_input)
return user_input <= len(language_list)
def change_preferred_language(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
process_language_selection(state_machine_data=state_machine_data)
user_input, ussd_session, account, session = state_machine_data
wait_for_session_data(resource_name='Preferred language', session_data_key='preferred_language', ussd_session=ussd_session)
preferred_language = ussd_session.get('data').get('preferred_language')
preferences_data = {
'preferred_language': preferred_language
}
s = celery.signature(
'cic_ussd.tasks.metadata.add_preferences_metadata',
[account.blockchain_address, preferences_data],
queue='cic-ussd'
)
return s.apply_async()
def process_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
preferred_language = preferred_langauge_from_selection(user_input=user_input)
data = {
'preferred_language': preferred_language
}
save_session_data(queue='cic-ussd', session=session, data=data, ussd_session=ussd_session)
def preferred_langauge_from_selection(user_input: str):
"""
:param user_input:
:type user_input:
:return:
:rtype:
"""
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list = json.loads(cached_system_languages)
user_input = int(user_input)
selected_language = language_list[user_input - 1]
preferred_language = i18n.config.get('fallback')
for key, value in Languages.languages_dict.items():
if selected_language[3:] == value:
preferred_language = key
return preferred_language