Addresses remaining bullets in hardening issue.
This commit is contained in:
parent
55266fd721
commit
b43d9618d9
@ -13,7 +13,6 @@ from cic_types.condiments import MetadataPointer
|
||||
from cic_ussd.account.chain import Chain
|
||||
from cic_ussd.account.transaction import from_wei
|
||||
from cic_ussd.cache import cache_data_key, get_cached_data
|
||||
from cic_ussd.translation import translation_for
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
@ -97,17 +96,3 @@ def query_statement(blockchain_address: str, limit: int = 9):
|
||||
callback_param=blockchain_address
|
||||
)
|
||||
cic_eth_api.list(address=blockchain_address, limit=limit)
|
||||
|
||||
|
||||
def statement_transaction_set(preferred_language: str, transaction_reprs: list):
|
||||
"""
|
||||
:param preferred_language:
|
||||
:type preferred_language:
|
||||
:param transaction_reprs:
|
||||
:type transaction_reprs:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
if not transaction_reprs:
|
||||
return translation_for('helpers.no_transaction_history', preferred_language)
|
||||
return ''.join(f'{transaction_repr}\n' for transaction_repr in transaction_reprs)
|
||||
|
@ -15,7 +15,6 @@ from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
|
||||
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
|
||||
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
|
||||
from cic_ussd.processor.util import wait_for_cache
|
||||
from cic_ussd.translation import translation_for
|
||||
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
@ -326,16 +325,3 @@ def set_active_token(blockchain_address: str, token_symbol: str):
|
||||
cache_data(key=key, data=token_symbol)
|
||||
|
||||
|
||||
def token_list_set(preferred_language: str, token_data_reprs: list):
|
||||
"""
|
||||
:param preferred_language:
|
||||
:type preferred_language:
|
||||
:param token_data_reprs:
|
||||
:type token_data_reprs:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
if not token_data_reprs:
|
||||
return translation_for('helpers.no_tokens_list', preferred_language)
|
||||
return ''.join(f'{token_data_repr}\n' for token_data_repr in token_data_reprs)
|
||||
|
||||
|
@ -55,5 +55,6 @@ def cache_data_key(identifier: Union[list, bytes], salt: MetadataPointer):
|
||||
hash_object.update(identity)
|
||||
else:
|
||||
hash_object.update(identifier)
|
||||
hash_object.update(salt.value.encode(encoding="utf-8"))
|
||||
if salt != MetadataPointer.NONE:
|
||||
hash_object.update(salt.value.encode(encoding="utf-8"))
|
||||
return hash_object.digest().hex()
|
||||
|
@ -19,34 +19,33 @@ from cic_ussd.account.metadata import get_cached_preferred_language
|
||||
from cic_ussd.account.statement import (
|
||||
get_cached_statement,
|
||||
parse_statement_transactions,
|
||||
query_statement,
|
||||
statement_transaction_set
|
||||
)
|
||||
query_statement)
|
||||
from cic_ussd.account.tokens import (create_account_tokens_list,
|
||||
get_active_token_symbol,
|
||||
get_cached_token_data,
|
||||
get_cached_token_symbol_list,
|
||||
get_cached_token_data_list,
|
||||
parse_token_list,
|
||||
token_list_set)
|
||||
parse_token_list)
|
||||
from cic_ussd.account.transaction import from_wei, to_wei
|
||||
from cic_ussd.cache import cache_data_key, cache_data
|
||||
from cic_ussd.cache import cache_data_key, cache_data, get_cached_data
|
||||
from cic_ussd.db.models.account import Account
|
||||
from cic_ussd.metadata import PersonMetadata
|
||||
from cic_ussd.phone_number import Support
|
||||
from cic_ussd.processor.util import parse_person_metadata
|
||||
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list, wait_for_session_data
|
||||
from cic_ussd.session.ussd_session import save_session_data
|
||||
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
|
||||
from cic_ussd.translation import translation_for
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
|
||||
class MenuProcessor:
|
||||
def __init__(self, account: Account, display_key: str, menu_name: str, session: Session, ussd_session: dict):
|
||||
self.account = account
|
||||
self.display_key = display_key
|
||||
self.identifier = bytes.fromhex(self.account.blockchain_address)
|
||||
if account:
|
||||
self.identifier = bytes.fromhex(self.account.blockchain_address)
|
||||
self.menu_name = menu_name
|
||||
self.session = session
|
||||
self.ussd_session = ussd_session
|
||||
@ -89,36 +88,29 @@ class MenuProcessor:
|
||||
:rtype:
|
||||
"""
|
||||
cached_statement = get_cached_statement(self.account.blockchain_address)
|
||||
transaction_sets = []
|
||||
if cached_statement:
|
||||
statement = json.loads(cached_statement)
|
||||
statement_transactions = parse_statement_transactions(statement)
|
||||
transaction_sets = [statement_transactions[tx:tx + 3] for tx in range(0, len(statement_transactions), 3)]
|
||||
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
if not preferred_language:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
no_transaction_history = statement_transaction_set(preferred_language, transaction_sets)
|
||||
first_transaction_set = no_transaction_history
|
||||
middle_transaction_set = no_transaction_history
|
||||
last_transaction_set = no_transaction_history
|
||||
if transaction_sets:
|
||||
first_transaction_set = statement_transaction_set(preferred_language, transaction_sets[0])
|
||||
if len(transaction_sets) >= 2:
|
||||
middle_transaction_set = statement_transaction_set(preferred_language, transaction_sets[1])
|
||||
if len(transaction_sets) >= 3:
|
||||
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
|
||||
|
||||
if self.display_key == 'ussd.kenya.first_transaction_set':
|
||||
statement_list = []
|
||||
if cached_statement:
|
||||
statement_list = parse_statement_transactions(statement=json.loads(cached_statement))
|
||||
|
||||
fallback = translation_for('helpers.no_transaction_history', preferred_language)
|
||||
transaction_sets = ussd_menu_list(fallback=fallback, menu_list=statement_list, split=3)
|
||||
|
||||
if self.display_key == 'ussd.first_transaction_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, first_transaction_set=first_transaction_set
|
||||
self.display_key, preferred_language, first_transaction_set=transaction_sets[0]
|
||||
)
|
||||
if self.display_key == 'ussd.kenya.middle_transaction_set':
|
||||
if self.display_key == 'ussd.middle_transaction_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, middle_transaction_set=middle_transaction_set
|
||||
self.display_key, preferred_language, middle_transaction_set=transaction_sets[1]
|
||||
)
|
||||
if self.display_key == 'ussd.kenya.last_transaction_set':
|
||||
if self.display_key == 'ussd.last_transaction_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, last_transaction_set=last_transaction_set
|
||||
self.display_key, preferred_language, last_transaction_set=transaction_sets[2]
|
||||
)
|
||||
|
||||
def add_guardian_pin_authorization(self):
|
||||
@ -129,7 +121,7 @@ class MenuProcessor:
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
if not preferred_language:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
set_guardians = self.account.get_guardians()
|
||||
set_guardians = self.account.get_guardians()[:3]
|
||||
if set_guardians:
|
||||
guardians_list = ''
|
||||
guardians_list_header = translation_for('helpers.guardians_list_header', preferred_language)
|
||||
@ -145,36 +137,30 @@ class MenuProcessor:
|
||||
def account_tokens(self) -> str:
|
||||
cached_token_data_list = get_cached_token_data_list(self.account.blockchain_address)
|
||||
token_data_list = parse_token_list(cached_token_data_list)
|
||||
token_list_sets = [token_data_list[tds:tds + 3] for tds in range(0, len(token_data_list), 3)]
|
||||
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
if not preferred_language:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
no_token_list = token_list_set(preferred_language, [])
|
||||
first_account_tokens_set = no_token_list
|
||||
middle_account_tokens_set = no_token_list
|
||||
last_account_tokens_set = no_token_list
|
||||
if token_list_sets:
|
||||
data = {
|
||||
'account_tokens_list': cached_token_data_list
|
||||
}
|
||||
save_session_data(data=data, queue='cic-ussd', session=self.session, ussd_session=self.ussd_session)
|
||||
first_account_tokens_set = token_list_set(preferred_language, token_list_sets[0])
|
||||
|
||||
if len(token_list_sets) >= 2:
|
||||
middle_account_tokens_set = token_list_set(preferred_language, token_list_sets[1])
|
||||
if len(token_list_sets) >= 3:
|
||||
last_account_tokens_set = token_list_set(preferred_language, token_list_sets[2])
|
||||
if self.display_key == 'ussd.kenya.first_account_tokens_set':
|
||||
fallback = translation_for('helpers.no_tokens_list', preferred_language)
|
||||
token_list_sets = ussd_menu_list(fallback=fallback, menu_list=token_data_list, split=3)
|
||||
|
||||
data = {
|
||||
'account_tokens_list': cached_token_data_list
|
||||
}
|
||||
save_session_data(data=data, queue='cic-ussd', session=self.session, ussd_session=self.ussd_session)
|
||||
|
||||
if self.display_key == 'ussd.first_account_tokens_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, first_account_tokens_set=first_account_tokens_set
|
||||
self.display_key, preferred_language, first_account_tokens_set=token_list_sets[0]
|
||||
)
|
||||
if self.display_key == 'ussd.kenya.middle_account_tokens_set':
|
||||
if self.display_key == 'ussd.middle_account_tokens_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, middle_account_tokens_set=middle_account_tokens_set
|
||||
self.display_key, preferred_language, middle_account_tokens_set=token_list_sets[1]
|
||||
)
|
||||
if self.display_key == 'ussd.kenya.last_account_tokens_set':
|
||||
if self.display_key == 'ussd.last_account_tokens_set':
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, last_account_tokens_set=last_account_tokens_set
|
||||
self.display_key, preferred_language, last_account_tokens_set=token_list_sets[2]
|
||||
)
|
||||
|
||||
def help(self) -> str:
|
||||
@ -222,7 +208,7 @@ class MenuProcessor:
|
||||
remaining_attempts = 3
|
||||
remaining_attempts -= self.account.failed_pin_attempts
|
||||
retry_pin_entry = translation_for(
|
||||
'ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=remaining_attempts
|
||||
'ussd.retry_pin_entry', preferred_language, remaining_attempts=remaining_attempts
|
||||
)
|
||||
return translation_for(
|
||||
f'{self.display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry
|
||||
@ -238,6 +224,38 @@ class MenuProcessor:
|
||||
guardian = Account.get_by_phone_number(guardian_phone_number, self.session)
|
||||
return guardian.standard_metadata_id()
|
||||
|
||||
def language(self):
|
||||
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
|
||||
cached_system_languages = get_cached_data(key)
|
||||
language_list: list = json.loads(cached_system_languages)
|
||||
|
||||
if self.account:
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
else:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
|
||||
fallback = translation_for('helpers.no_language_list', preferred_language)
|
||||
language_list_sets = ussd_menu_list(fallback=fallback, menu_list=language_list, split=3)
|
||||
|
||||
if self.display_key in ['ussd.initial_language_selection', 'ussd.select_preferred_language']:
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, first_language_set=language_list_sets[0]
|
||||
)
|
||||
|
||||
if 'middle_language_set' in self.display_key:
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, middle_language_set=language_list_sets[1]
|
||||
)
|
||||
|
||||
if 'last_language_set' in self.display_key:
|
||||
return translation_for(
|
||||
self.display_key, preferred_language, last_language_set=language_list_sets[2]
|
||||
)
|
||||
|
||||
def account_creation_prompt(self):
|
||||
preferred_language = preferred_langauge_from_selection(self.ussd_session.get('user_input'))
|
||||
return translation_for(self.display_key, preferred_language)
|
||||
|
||||
def reset_guarded_pin_authorization(self):
|
||||
guarded_account_information = self.guarded_account_metadata()
|
||||
return self.pin_authorization(guarded_account_information=guarded_account_information)
|
||||
@ -381,8 +399,9 @@ class MenuProcessor:
|
||||
)
|
||||
|
||||
def exit_invalid_menu_option(self):
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
if not preferred_language:
|
||||
if self.account:
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
else:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
return translation_for(self.display_key, preferred_language, support_phone=Support.phone_number)
|
||||
|
||||
@ -390,7 +409,7 @@ class MenuProcessor:
|
||||
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
|
||||
if not preferred_language:
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
return translation_for('ussd.kenya.exit_pin_blocked', preferred_language, support_phone=Support.phone_number)
|
||||
return translation_for('ussd.exit_pin_blocked', preferred_language, support_phone=Support.phone_number)
|
||||
|
||||
def exit_successful_token_selection(self) -> str:
|
||||
selected_token = self.ussd_session.get('data').get('selected_token')
|
||||
@ -445,6 +464,9 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
|
||||
"""
|
||||
menu_processor = MenuProcessor(account, display_key, menu_name, session, ussd_session)
|
||||
|
||||
if menu_name == 'account_creation_prompt':
|
||||
return menu_processor.account_creation_prompt()
|
||||
|
||||
if menu_name == 'start':
|
||||
return menu_processor.start_menu()
|
||||
|
||||
@ -502,6 +524,9 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
|
||||
if 'account_tokens_set' in menu_name:
|
||||
return menu_processor.account_tokens()
|
||||
|
||||
if 'language' in menu_name:
|
||||
return menu_processor.language()
|
||||
|
||||
if menu_name == 'display_user_metadata':
|
||||
return menu_processor.person_metadata()
|
||||
|
||||
@ -515,5 +540,4 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
|
||||
return menu_processor.exit_successful_token_selection()
|
||||
|
||||
preferred_language = get_cached_preferred_language(account.blockchain_address)
|
||||
|
||||
return translation_for(display_key, preferred_language)
|
||||
|
@ -8,7 +8,7 @@ from sqlalchemy.orm.session import Session
|
||||
from tinydb.table import Document
|
||||
|
||||
# local imports
|
||||
from cic_ussd.db.models.account import Account, create
|
||||
from cic_ussd.db.models.account import Account
|
||||
from cic_ussd.db.models.base import SessionBase
|
||||
from cic_ussd.db.models.ussd_session import UssdSession
|
||||
from cic_ussd.menu.ussd_menu import UssdMenu
|
||||
@ -16,7 +16,6 @@ from cic_ussd.processor.menu import response
|
||||
from cic_ussd.processor.util import latest_input, resume_last_ussd_session
|
||||
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session
|
||||
from cic_ussd.state_machine import UssdStateMachine
|
||||
from cic_ussd.translation import translation_for
|
||||
from cic_ussd.validator import is_valid_response
|
||||
|
||||
|
||||
@ -36,9 +35,6 @@ def handle_menu(account: Account, session: Session) -> Document:
|
||||
last_ussd_session = UssdSession.last_ussd_session(account.phone_number, session)
|
||||
if last_ussd_session:
|
||||
return resume_last_ussd_session(last_ussd_session.state)
|
||||
|
||||
elif not account.has_preferred_language():
|
||||
return UssdMenu.find_by_name('initial_language_selection')
|
||||
else:
|
||||
return UssdMenu.find_by_name('initial_pin_entry')
|
||||
|
||||
@ -71,16 +67,13 @@ def get_menu(account: Account,
|
||||
return UssdMenu.find_by_name(state)
|
||||
|
||||
|
||||
def handle_menu_operations(chain_str: str,
|
||||
external_session_id: str,
|
||||
def handle_menu_operations(external_session_id: str,
|
||||
phone_number: str,
|
||||
queue: str,
|
||||
service_code: str,
|
||||
session,
|
||||
user_input: str):
|
||||
"""
|
||||
:param chain_str:
|
||||
:type chain_str:
|
||||
:param external_session_id:
|
||||
:type external_session_id:
|
||||
:param phone_number:
|
||||
@ -100,10 +93,38 @@ def handle_menu_operations(chain_str: str,
|
||||
account: Account = Account.get_by_phone_number(phone_number, session)
|
||||
if account:
|
||||
return handle_account_menu_operations(account, external_session_id, queue, session, service_code, user_input)
|
||||
create(chain_str, phone_number, session)
|
||||
menu = UssdMenu.find_by_name('account_creation_prompt')
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
create_or_update_session(
|
||||
else:
|
||||
return handle_no_account_menu_operations(
|
||||
account, external_session_id, phone_number, queue, session, service_code, user_input)
|
||||
|
||||
|
||||
def handle_no_account_menu_operations(account: Optional[Account],
|
||||
external_session_id: str,
|
||||
phone_number: str,
|
||||
queue: str,
|
||||
session: Session,
|
||||
service_code: str,
|
||||
user_input: str):
|
||||
"""
|
||||
:param account:
|
||||
:type account:
|
||||
:param external_session_id:
|
||||
:type external_session_id:
|
||||
:param phone_number:
|
||||
:type phone_number:
|
||||
:param queue:
|
||||
:type queue:
|
||||
:param session:
|
||||
:type session:
|
||||
:param service_code:
|
||||
:type service_code:
|
||||
:param user_input:
|
||||
:type user_input:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
menu = UssdMenu.find_by_name('initial_language_selection')
|
||||
ussd_session = create_or_update_session(
|
||||
external_session_id=external_session_id,
|
||||
msisdn=phone_number,
|
||||
service_code=service_code,
|
||||
@ -111,7 +132,20 @@ def handle_menu_operations(chain_str: str,
|
||||
session=session,
|
||||
user_input=user_input)
|
||||
persist_ussd_session(external_session_id, queue)
|
||||
return translation_for('ussd.kenya.account_creation_prompt', preferred_language)
|
||||
last_ussd_session: UssdSession = UssdSession.last_ussd_session(phone_number, session)
|
||||
if last_ussd_session:
|
||||
if not user_input:
|
||||
menu = resume_last_ussd_session(last_ussd_session.state)
|
||||
else:
|
||||
session = SessionBase.bind_session(session)
|
||||
state = next_state(account, session, user_input, last_ussd_session.to_json())
|
||||
menu = UssdMenu.find_by_name(state)
|
||||
|
||||
return response(account=account,
|
||||
display_key=menu.get('display_key'),
|
||||
menu_name=menu.get('name'),
|
||||
session=session,
|
||||
ussd_session=ussd_session.to_json())
|
||||
|
||||
|
||||
def handle_account_menu_operations(account: Account,
|
||||
@ -152,15 +186,12 @@ def handle_account_menu_operations(account: Account,
|
||||
if last_ussd_session:
|
||||
ussd_session = create_or_update_session(
|
||||
external_session_id, phone_number, service_code, user_input, menu.get('name'), session,
|
||||
last_ussd_session.data
|
||||
)
|
||||
last_ussd_session.data)
|
||||
else:
|
||||
ussd_session = create_or_update_session(
|
||||
external_session_id, phone_number, service_code, user_input, menu.get('name'), session, None
|
||||
)
|
||||
external_session_id, phone_number, service_code, user_input, menu.get('name'), session, {})
|
||||
menu_response = response(
|
||||
account, menu.get('display_key'), menu.get('name'), session, ussd_session.to_json()
|
||||
)
|
||||
account, menu.get('display_key'), menu.get('name'), session, ussd_session.to_json())
|
||||
if not is_valid_response(menu_response):
|
||||
raise ValueError(f'Invalid response: {response}')
|
||||
persist_ussd_session(external_session_id, queue)
|
||||
|
@ -3,7 +3,7 @@ import datetime
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Union
|
||||
from typing import List, Union
|
||||
|
||||
# external imports
|
||||
from cic_types.condiments import MetadataPointer
|
||||
@ -21,9 +21,7 @@ logg = logging.getLogger(__file__)
|
||||
def latest_input(user_input: str) -> str:
|
||||
"""
|
||||
:param user_input:
|
||||
:type user_input:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
return user_input.split('*')[-1]
|
||||
|
||||
@ -85,6 +83,27 @@ def resume_last_ussd_session(last_state: str) -> Document:
|
||||
return UssdMenu.find_by_name(last_state)
|
||||
|
||||
|
||||
def ussd_menu_list(fallback: str, menu_list: list, split: int = 3) -> List[str]:
|
||||
"""
|
||||
:param fallback:
|
||||
:type fallback:
|
||||
:param menu_list:
|
||||
:type menu_list:
|
||||
:param split:
|
||||
:type split:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
menu_list_sets = [menu_list[item:item + split] for item in range(0, len(menu_list), split)]
|
||||
menu_list_reprs = []
|
||||
for i in range(split):
|
||||
try:
|
||||
menu_list_reprs.append(''.join(f'{list_set_item}\n' for list_set_item in menu_list_sets[i]).rstrip('\n'))
|
||||
except IndexError:
|
||||
menu_list_reprs.append(fallback)
|
||||
return menu_list_reprs
|
||||
|
||||
|
||||
def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5):
|
||||
"""
|
||||
:param identifier:
|
||||
@ -132,17 +151,28 @@ def wait_for_session_data(resource_name: str, session_data_key: str, ussd_sessio
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
session_data = ussd_session.get('data').get(session_data_key)
|
||||
counter = 0
|
||||
while session_data is None:
|
||||
logg.debug(f'Waiting for: {resource_name}. Checking after: {interval} ...')
|
||||
data = ussd_session.get('data')
|
||||
data_poller = 0
|
||||
while not data:
|
||||
logg.debug(f'Waiting for data object on ussd session: {ussd_session.get("external_session_id")}')
|
||||
logg.debug(f'Data poller at: {data_poller}. Checking again after: {interval} secs...')
|
||||
time.sleep(interval)
|
||||
counter += 1
|
||||
session_data = ussd_session.get('data').get(session_data_key)
|
||||
if session_data is not None:
|
||||
logg.debug(f'{resource_name} now available.')
|
||||
data_poller += 1
|
||||
if data:
|
||||
logg.debug(f'Data object found, proceeding to poll for: {session_data_key}')
|
||||
break
|
||||
else:
|
||||
if counter == max_retry:
|
||||
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
|
||||
if data:
|
||||
session_data_poller = 0
|
||||
session_data = data.get(session_data_key)
|
||||
while not session_data_key:
|
||||
logg.debug(
|
||||
f'Session data poller at: {data_poller} with max retry at: {max_retry}. Checking again after: {interval} secs...')
|
||||
time.sleep(interval)
|
||||
session_data_poller += 1
|
||||
|
||||
if session_data:
|
||||
logg.debug(f'{resource_name} now available.')
|
||||
break
|
||||
|
||||
elif session_data_poller >= max_retry:
|
||||
logg.debug(f'Could not find data object within: {max_retry}')
|
||||
|
95
apps/cic-ussd/cic_ussd/state_machine/logic/language.py
Normal file
95
apps/cic-ussd/cic_ussd/state_machine/logic/language.py
Normal file
@ -0,0 +1,95 @@
|
||||
# standard imports
|
||||
import json
|
||||
from typing import Tuple
|
||||
|
||||
# external imports
|
||||
import celery
|
||||
import i18n
|
||||
from cic_types.condiments import MetadataPointer
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
# local imports
|
||||
from cic_ussd.cache import cache_data_key, get_cached_data
|
||||
from cic_ussd.db.models.account import Account
|
||||
from cic_ussd.processor.util import wait_for_cache, wait_for_session_data
|
||||
from cic_ussd.session.ussd_session import save_session_data
|
||||
from cic_ussd.translation import Languages
|
||||
|
||||
|
||||
def is_valid_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
|
||||
"""
|
||||
:param state_machine_data:
|
||||
:type state_machine_data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
user_input, ussd_session, account, session = state_machine_data
|
||||
|
||||
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
|
||||
cached_system_languages = get_cached_data(key)
|
||||
language_list = json.loads(cached_system_languages)
|
||||
|
||||
if not language_list:
|
||||
wait_for_cache(identifier='system:languages'.encode('utf-8'), resource_name='Languages list', salt=MetadataPointer.NONE)
|
||||
|
||||
if user_input in ['00', '11', '22']:
|
||||
return False
|
||||
user_input = int(user_input)
|
||||
return user_input <= len(language_list)
|
||||
|
||||
|
||||
def change_preferred_language(state_machine_data: Tuple[str, dict, Account, Session]):
|
||||
"""
|
||||
:param state_machine_data:
|
||||
:type state_machine_data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
process_language_selection(state_machine_data=state_machine_data)
|
||||
user_input, ussd_session, account, session = state_machine_data
|
||||
wait_for_session_data(resource_name='Preferred language', session_data_key='preferred_language', ussd_session=ussd_session)
|
||||
preferred_language = ussd_session.get('data').get('preferred_language')
|
||||
preferences_data = {
|
||||
'preferred_language': preferred_language
|
||||
}
|
||||
|
||||
s = celery.signature(
|
||||
'cic_ussd.tasks.metadata.add_preferences_metadata',
|
||||
[account.blockchain_address, preferences_data],
|
||||
queue='cic-ussd'
|
||||
)
|
||||
return s.apply_async()
|
||||
|
||||
|
||||
def process_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
|
||||
"""
|
||||
:param state_machine_data:
|
||||
:type state_machine_data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
user_input, ussd_session, account, session = state_machine_data
|
||||
preferred_language = preferred_langauge_from_selection(user_input=user_input)
|
||||
data = {
|
||||
'preferred_language': preferred_language
|
||||
}
|
||||
save_session_data(queue='cic-ussd', session=session, data=data, ussd_session=ussd_session)
|
||||
|
||||
|
||||
def preferred_langauge_from_selection(user_input: str):
|
||||
"""
|
||||
:param user_input:
|
||||
:type user_input:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
|
||||
cached_system_languages = get_cached_data(key)
|
||||
language_list = json.loads(cached_system_languages)
|
||||
user_input = int(user_input)
|
||||
selected_language = language_list[user_input - 1]
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
for key, value in Languages.languages_dict.items():
|
||||
if selected_language[3:] == value:
|
||||
preferred_language = key
|
||||
return preferred_language
|
@ -23,7 +23,7 @@ def is_valid_token_selection(state_machine_data: Tuple[str, dict, Account, Sessi
|
||||
account_tokens_list = session_data.get('account_tokens_list')
|
||||
if not account_tokens_list:
|
||||
wait_for_session_data('Account token list', session_data_key='account_tokens_list', ussd_session=ussd_session)
|
||||
if user_input not in ['00', '22']:
|
||||
if user_input not in ['00', '11', '22']:
|
||||
try:
|
||||
user_input = int(user_input)
|
||||
return user_input <= len(account_tokens_list)
|
||||
|
@ -1,9 +1,56 @@
|
||||
"""
|
||||
This module is responsible for translation of ussd menu text based on a user's set preferred language.
|
||||
"""
|
||||
# standard imports
|
||||
import json
|
||||
|
||||
import i18n
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# external imports
|
||||
from cic_translations.processor import generate_translation_files, parse_csv
|
||||
from cic_types.condiments import MetadataPointer
|
||||
|
||||
# local imports
|
||||
from cic_ussd.cache import cache_data, cache_data_key
|
||||
from cic_ussd.validator import validate_presence
|
||||
|
||||
|
||||
def generate_locale_files(locale_dir: str, schema_file_path: str, translation_builder_path: str):
|
||||
""""""
|
||||
translation_builder_files = os.listdir(translation_builder_path)
|
||||
for file in translation_builder_files:
|
||||
props = Path(file)
|
||||
if props.suffix == '.csv':
|
||||
parsed_csv = parse_csv(os.path.join(translation_builder_path, file))
|
||||
generate_translation_files(
|
||||
parsed_csv=parsed_csv,
|
||||
schema_file_path=schema_file_path,
|
||||
translation_file_type=props.stem,
|
||||
translation_file_path=locale_dir
|
||||
)
|
||||
|
||||
|
||||
class Languages:
|
||||
languages_dict: dict = None
|
||||
|
||||
@classmethod
|
||||
def load_languages_dict(cls, languages_file: str):
|
||||
with open(languages_file, "r") as languages_file:
|
||||
cls.languages_dict = json.load(languages_file)
|
||||
|
||||
def cache_system_languages(self):
|
||||
system_languages: list = list(self.languages_dict.values())
|
||||
languages_list = []
|
||||
for i in range(len(system_languages)):
|
||||
language = f'{i + 1}. {system_languages[i]}'
|
||||
languages_list.append(language)
|
||||
|
||||
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
|
||||
cache_data(key, json.dumps(languages_list))
|
||||
|
||||
|
||||
def translation_for(key: str, preferred_language: Optional[str] = None, **kwargs) -> str:
|
||||
"""
|
||||
|
@ -11,3 +11,6 @@ transitions=transitions/
|
||||
host =
|
||||
port =
|
||||
ssl =
|
||||
|
||||
[system]
|
||||
guardians_file = var/lib/sys/guardians.txt
|
||||
|
@ -47,11 +47,11 @@ def test_menu_processor(activated_account,
|
||||
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
|
||||
available_balance = get_cached_available_balance(activated_account.blockchain_address)
|
||||
token_symbol = get_default_token_symbol()
|
||||
with_available_balance = 'ussd.kenya.account_balances.available_balance'
|
||||
with_fees = 'ussd.kenya.account_balances.with_fees'
|
||||
with_available_balance = 'ussd.account_balances.available_balance'
|
||||
with_fees = 'ussd.account_balances.with_fees'
|
||||
ussd_menu = UssdMenu.find_by_name('account_balances')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, 'ussd.kenya.account_balances', name, init_database, generic_ussd_session)
|
||||
resp = response(activated_account, 'ussd.account_balances', name, init_database, generic_ussd_session)
|
||||
assert resp == translation_for(with_available_balance,
|
||||
preferred_language,
|
||||
available_balance=available_balance,
|
||||
@ -61,7 +61,7 @@ def test_menu_processor(activated_account,
|
||||
key = cache_data_key(identifier, MetadataPointer.BALANCES_ADJUSTED)
|
||||
adjusted_balance = 45931650.64654012
|
||||
cache_data(key, json.dumps(adjusted_balance))
|
||||
resp = response(activated_account, 'ussd.kenya.account_balances', name, init_database, generic_ussd_session)
|
||||
resp = response(activated_account, 'ussd.account_balances', name, init_database, generic_ussd_session)
|
||||
tax_wei = to_wei(int(available_balance)) - int(adjusted_balance)
|
||||
tax = from_wei(int(tax_wei))
|
||||
assert resp == translation_for(key=with_fees,
|
||||
@ -84,28 +84,28 @@ def test_menu_processor(activated_account,
|
||||
if len(transaction_sets) >= 3:
|
||||
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
|
||||
|
||||
display_key = 'ussd.kenya.first_transaction_set'
|
||||
display_key = 'ussd.first_transaction_set'
|
||||
ussd_menu = UssdMenu.find_by_name('first_transaction_set')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
|
||||
assert resp == translation_for(display_key, preferred_language, first_transaction_set=first_transaction_set)
|
||||
|
||||
display_key = 'ussd.kenya.middle_transaction_set'
|
||||
display_key = 'ussd.middle_transaction_set'
|
||||
ussd_menu = UssdMenu.find_by_name('middle_transaction_set')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
|
||||
assert resp == translation_for(display_key, preferred_language, middle_transaction_set=middle_transaction_set)
|
||||
|
||||
display_key = 'ussd.kenya.last_transaction_set'
|
||||
display_key = 'ussd.last_transaction_set'
|
||||
ussd_menu = UssdMenu.find_by_name('last_transaction_set')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
|
||||
assert resp == translation_for(display_key, preferred_language, last_transaction_set=last_transaction_set)
|
||||
|
||||
display_key = 'ussd.kenya.display_user_metadata'
|
||||
display_key = 'ussd.display_user_metadata'
|
||||
ussd_menu = UssdMenu.find_by_name('display_user_metadata')
|
||||
name = ussd_menu.get('name')
|
||||
identifier = bytes.fromhex(activated_account.blockchain_address)
|
||||
@ -114,7 +114,7 @@ def test_menu_processor(activated_account,
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
assert resp == parse_person_metadata(cached_person_metadata, display_key, preferred_language)
|
||||
|
||||
display_key = 'ussd.kenya.account_balances_pin_authorization'
|
||||
display_key = 'ussd.account_balances_pin_authorization'
|
||||
ussd_menu = UssdMenu.find_by_name('account_balances_pin_authorization')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
@ -122,11 +122,11 @@ def test_menu_processor(activated_account,
|
||||
|
||||
activated_account.failed_pin_attempts = 1
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
retry_pin_entry = translation_for('ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=2)
|
||||
retry_pin_entry = translation_for('ussd.retry_pin_entry', preferred_language, remaining_attempts=2)
|
||||
assert resp == translation_for(f'{display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry)
|
||||
activated_account.failed_pin_attempts = 0
|
||||
|
||||
display_key = 'ussd.kenya.start'
|
||||
display_key = 'ussd.start'
|
||||
ussd_menu = UssdMenu.find_by_name('start')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
@ -135,7 +135,7 @@ def test_menu_processor(activated_account,
|
||||
account_balance=available_balance,
|
||||
account_token_name=token_symbol)
|
||||
|
||||
display_key = 'ussd.kenya.start'
|
||||
display_key = 'ussd.start'
|
||||
ussd_menu = UssdMenu.find_by_name('start')
|
||||
name = ussd_menu.get('name')
|
||||
older_timestamp = (activated_account.created - datetime.timedelta(days=35))
|
||||
@ -144,7 +144,7 @@ def test_menu_processor(activated_account,
|
||||
response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
assert mock_get_adjusted_balance['timestamp'] == int((datetime.datetime.now() - datetime.timedelta(days=30)).timestamp())
|
||||
|
||||
display_key = 'ussd.kenya.transaction_pin_authorization'
|
||||
display_key = 'ussd.transaction_pin_authorization'
|
||||
ussd_menu = UssdMenu.find_by_name('transaction_pin_authorization')
|
||||
name = ussd_menu.get('name')
|
||||
generic_ussd_session['data'] = {
|
||||
@ -163,7 +163,7 @@ def test_menu_processor(activated_account,
|
||||
token_symbol=token_symbol,
|
||||
sender_information=tx_sender_information)
|
||||
|
||||
display_key = 'ussd.kenya.exit_insufficient_balance'
|
||||
display_key = 'ussd.exit_insufficient_balance'
|
||||
ussd_menu = UssdMenu.find_by_name('exit_insufficient_balance')
|
||||
name = ussd_menu.get('name')
|
||||
generic_ussd_session['data'] = {
|
||||
@ -180,13 +180,13 @@ def test_menu_processor(activated_account,
|
||||
recipient_information=tx_recipient_information,
|
||||
token_balance=available_balance)
|
||||
|
||||
display_key = 'ussd.kenya.exit_invalid_menu_option'
|
||||
display_key = 'ussd.exit_invalid_menu_option'
|
||||
ussd_menu = UssdMenu.find_by_name('exit_invalid_menu_option')
|
||||
name = ussd_menu.get('name')
|
||||
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
|
||||
assert resp == translation_for(display_key, preferred_language, support_phone=Support.phone_number)
|
||||
|
||||
display_key = 'ussd.kenya.exit_successful_transaction'
|
||||
display_key = 'ussd.exit_successful_transaction'
|
||||
ussd_menu = UssdMenu.find_by_name('exit_successful_transaction')
|
||||
name = ussd_menu.get('name')
|
||||
generic_ussd_session['data'] = {
|
||||
|
@ -97,7 +97,7 @@ def test_handle_menu_operations(activated_account,
|
||||
valid_service_codes = load_config.get('USSD_SERVICE_CODE').split(",")
|
||||
preferred_language = i18n.config.get('fallback')
|
||||
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '4444')
|
||||
assert resp == translation_for('ussd.kenya.account_creation_prompt', preferred_language)
|
||||
assert resp == translation_for('ussd.account_creation_prompt', preferred_language)
|
||||
cached_ussd_session = get_cached_data(external_session_id)
|
||||
ussd_session = json.loads(cached_ussd_session)
|
||||
assert ussd_session['msisdn'] == phone
|
||||
@ -118,5 +118,5 @@ def test_handle_menu_operations(activated_account,
|
||||
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
|
||||
persisted_ussd_session.state = 'enter_transaction_recipient'
|
||||
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '1')
|
||||
assert resp == translation_for('ussd.kenya.enter_transaction_recipient', preferred_language)
|
||||
assert resp == translation_for('ussd.enter_transaction_recipient', preferred_language)
|
||||
|
||||
|
@ -32,7 +32,7 @@ def test_parse_person_metadata(activated_account, cache_person_metadata, cache_p
|
||||
cached_person_metadata = person_metadata.get_cached_metadata()
|
||||
person_metadata = json.loads(cached_person_metadata)
|
||||
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
|
||||
display_key = 'ussd.kenya.display_person_metadata'
|
||||
display_key = 'ussd.display_person_metadata'
|
||||
parsed_person_metadata = parse_person_metadata(cached_person_metadata,
|
||||
display_key,
|
||||
preferred_language)
|
||||
|
@ -8,8 +8,8 @@ from cic_ussd.notifications import Notifier
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key, preferred_language, recipient, expected_message", [
|
||||
("ussd.kenya.exit", "en", "+254712345678", "END Thank you for using the service."),
|
||||
("ussd.kenya.exit", "sw", "+254712345678", "END Asante kwa kutumia huduma.")
|
||||
("ussd.exit", "en", "+254712345678", "END Thank you for using the service."),
|
||||
("ussd.exit", "sw", "+254712345678", "END Asante kwa kutumia huduma.")
|
||||
])
|
||||
def test_send_sms_notification(celery_session_worker,
|
||||
expected_message,
|
||||
|
@ -10,11 +10,11 @@ from cic_ussd.translation import translation_for
|
||||
|
||||
def test_translation_for(set_locale_files):
|
||||
english_translation = translation_for(
|
||||
key='ussd.kenya.exit_invalid_request',
|
||||
key='ussd.exit_invalid_request',
|
||||
preferred_language='en'
|
||||
)
|
||||
swahili_translation = translation_for(
|
||||
key='ussd.kenya.exit_invalid_request',
|
||||
key='ussd.exit_invalid_request',
|
||||
preferred_language='sw'
|
||||
)
|
||||
assert swahili_translation == 'END Chaguo si sahihi.'
|
||||
|
Loading…
Reference in New Issue
Block a user