Refactors to use standalone poller.

This commit is contained in:
Philip Wafula 2022-01-07 13:21:24 +03:00
parent e11094b037
commit c31646a053
Signed by untrusted user: mango-habanero
GPG Key ID: B00CE9034DA19FB7
7 changed files with 7 additions and 80 deletions

View File

@ -14,7 +14,7 @@ from cic_ussd.account.chain import Chain
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
from cic_ussd.processor.util import wait_for_cache
from cic_ussd.processor.poller import wait_for_cache
logg = logging.getLogger(__file__)

View File

@ -31,7 +31,8 @@ from cic_ussd.cache import cache_data_key, cache_data, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import PersonMetadata
from cic_ussd.phone_number import Support
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list, wait_for_session_data
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
from cic_ussd.translation import translation_for

View File

@ -102,77 +102,3 @@ def ussd_menu_list(fallback: str, menu_list: list, split: int = 3) -> List[str]:
except IndexError:
menu_list_reprs.append(fallback)
return menu_list_reprs
def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5):
"""
:param identifier:
:type identifier:
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param salt:
:type salt:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
key = cache_data_key(identifier=identifier, salt=salt)
resource = get_cached_data(key)
counter = 0
while resource is None:
logg.debug(f'Waiting for: {resource_name} at: {key}. Checking after: {interval} ...')
time.sleep(interval)
counter += 1
resource = get_cached_data(key)
if resource is not None:
logg.debug(f'{resource_name} now available.')
break
else:
if counter == max_retry:
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
break
def wait_for_session_data(resource_name: str, session_data_key: str, ussd_session: dict, interval: int = 1, max_retry: int = 5):
"""
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param session_data_key:
:type session_data_key:
:param ussd_session:
:type ussd_session:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
data = ussd_session.get('data')
data_poller = 0
while not data:
logg.debug(f'Waiting for data object on ussd session: {ussd_session.get("external_session_id")}')
logg.debug(f'Data poller at: {data_poller}. Checking again after: {interval} secs...')
time.sleep(interval)
data_poller += 1
if data:
logg.debug(f'Data object found, proceeding to poll for: {session_data_key}')
break
if data:
session_data_poller = 0
session_data = data.get(session_data_key)
while not session_data_key:
logg.debug(
f'Session data poller at: {data_poller} with max retry at: {max_retry}. Checking again after: {interval} secs...')
time.sleep(interval)
session_data_poller += 1
if session_data:
logg.debug(f'{resource_name} now available.')
break
elif session_data_poller >= max_retry:
logg.debug(f'Could not find data object within: {max_retry}')

View File

@ -11,7 +11,7 @@ from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_cache, wait_for_session_data
from cic_ussd.processor.poller import wait_for_cache, wait_for_session_data
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.translation import Languages

View File

@ -15,7 +15,7 @@ from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.enum import AccountStatus
from cic_ussd.encoder import create_password_hash, check_password_hash
from cic_ussd.processor.util import wait_for_session_data
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session

View File

@ -7,7 +7,7 @@ from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.account.tokens import set_active_token
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_session_data
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.session.ussd_session import save_session_data

View File

@ -15,7 +15,7 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data
from cic_ussd.account.chain import Chain
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_cache
from cic_ussd.processor.poller import wait_for_cache
from cic_ussd.account.statement import filter_statement_transactions
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.account.tokens import (collate_token_metadata,