Adds util functions for polling against redis cache.
This commit is contained in:
parent
b3fdd8b52c
commit
c3ee108c8b
@ -1,15 +1,22 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
# external imports
|
# external imports
|
||||||
|
from cic_types.condiments import MetadataPointer
|
||||||
from cic_types.models.person import get_contact_data_from_vcard
|
from cic_types.models.person import get_contact_data_from_vcard
|
||||||
from tinydb.table import Document
|
from tinydb.table import Document
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
|
from cic_ussd.cache import cache_data_key, get_cached_data
|
||||||
from cic_ussd.menu.ussd_menu import UssdMenu
|
from cic_ussd.menu.ussd_menu import UssdMenu
|
||||||
from cic_ussd.translation import translation_for
|
from cic_ussd.translation import translation_for
|
||||||
|
|
||||||
|
logg = logging.getLogger(__file__)
|
||||||
|
|
||||||
|
|
||||||
def latest_input(user_input: str) -> str:
|
def latest_input(user_input: str) -> str:
|
||||||
"""
|
"""
|
||||||
@ -76,3 +83,66 @@ def resume_last_ussd_session(last_state: str) -> Document:
|
|||||||
if last_state in non_reusable_states:
|
if last_state in non_reusable_states:
|
||||||
return UssdMenu.find_by_name('start')
|
return UssdMenu.find_by_name('start')
|
||||||
return UssdMenu.find_by_name(last_state)
|
return UssdMenu.find_by_name(last_state)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5):
|
||||||
|
"""
|
||||||
|
:param identifier:
|
||||||
|
:type identifier:
|
||||||
|
:param interval:
|
||||||
|
:type interval:
|
||||||
|
:param resource_name:
|
||||||
|
:type resource_name:
|
||||||
|
:param salt:
|
||||||
|
:type salt:
|
||||||
|
:param max_retry:
|
||||||
|
:type max_retry:
|
||||||
|
:return:
|
||||||
|
:rtype:
|
||||||
|
"""
|
||||||
|
key = cache_data_key(identifier=identifier, salt=salt)
|
||||||
|
resource = get_cached_data(key)
|
||||||
|
counter = 0
|
||||||
|
while resource is None:
|
||||||
|
logg.debug(f'Waiting for: {resource_name} at: {key}. Checking after: {interval} ...')
|
||||||
|
time.sleep(interval)
|
||||||
|
counter += 1
|
||||||
|
resource = get_cached_data(key)
|
||||||
|
if resource is not None:
|
||||||
|
logg.debug(f'{resource_name} now available.')
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if counter == max_retry:
|
||||||
|
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_session_data(resource_name: str, session_data_key: str, ussd_session: dict, interval: int = 1, max_retry: int = 5):
|
||||||
|
"""
|
||||||
|
:param interval:
|
||||||
|
:type interval:
|
||||||
|
:param resource_name:
|
||||||
|
:type resource_name:
|
||||||
|
:param session_data_key:
|
||||||
|
:type session_data_key:
|
||||||
|
:param ussd_session:
|
||||||
|
:type ussd_session:
|
||||||
|
:param max_retry:
|
||||||
|
:type max_retry:
|
||||||
|
:return:
|
||||||
|
:rtype:
|
||||||
|
"""
|
||||||
|
session_data = ussd_session.get('data').get(session_data_key)
|
||||||
|
counter = 0
|
||||||
|
while session_data is None:
|
||||||
|
logg.debug(f'Waiting for: {resource_name}. Checking after: {interval} ...')
|
||||||
|
time.sleep(interval)
|
||||||
|
counter += 1
|
||||||
|
session_data = ussd_session.get('data').get(session_data_key)
|
||||||
|
if session_data is not None:
|
||||||
|
logg.debug(f'{resource_name} now available.')
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if counter == max_retry:
|
||||||
|
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
|
||||||
|
break
|
||||||
|
Loading…
Reference in New Issue
Block a user