diff --git a/apps/cic-ussd/cic_ussd/account/tokens.py b/apps/cic-ussd/cic_ussd/account/tokens.py index 288b4d9c..68ba1ace 100644 --- a/apps/cic-ussd/cic_ussd/account/tokens.py +++ b/apps/cic-ussd/cic_ussd/account/tokens.py @@ -14,7 +14,7 @@ from cic_ussd.account.chain import Chain from cic_ussd.cache import cache_data, cache_data_key, get_cached_data from cic_ussd.error import CachedDataNotFoundError, SeppukuError from cic_ussd.metadata.tokens import query_token_info, query_token_metadata -from cic_ussd.processor.util import wait_for_cache +from cic_ussd.processor.poller import wait_for_cache logg = logging.getLogger(__file__) diff --git a/apps/cic-ussd/cic_ussd/error.py b/apps/cic-ussd/cic_ussd/error.py index 31bbc745..29dc2ef4 100644 --- a/apps/cic-ussd/cic_ussd/error.py +++ b/apps/cic-ussd/cic_ussd/error.py @@ -52,4 +52,5 @@ class UnknownUssdRecipient(Exception): """Raised when a recipient of a transaction is not known to the ussd application.""" - +class MaxRetryReached(Exception): + """Raised when the maximum number of retries defined for polling for the availability of a resource.""" diff --git a/apps/cic-ussd/cic_ussd/processor/menu.py b/apps/cic-ussd/cic_ussd/processor/menu.py index 31a734d9..fd5bc2f7 100644 --- a/apps/cic-ussd/cic_ussd/processor/menu.py +++ b/apps/cic-ussd/cic_ussd/processor/menu.py @@ -31,7 +31,8 @@ from cic_ussd.cache import cache_data_key, cache_data, get_cached_data from cic_ussd.db.models.account import Account from cic_ussd.metadata import PersonMetadata from cic_ussd.phone_number import Support -from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list, wait_for_session_data +from cic_ussd.processor.poller import wait_for_session_data +from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list from cic_ussd.session.ussd_session import save_session_data from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection from cic_ussd.translation import translation_for diff --git a/apps/cic-ussd/cic_ussd/processor/poller.py b/apps/cic-ussd/cic_ussd/processor/poller.py new file mode 100644 index 00000000..6429b15a --- /dev/null +++ b/apps/cic-ussd/cic_ussd/processor/poller.py @@ -0,0 +1,104 @@ +# standard imports +import logging +import time +from queue import Queue +from typing import Callable, Dict, Optional, Tuple, Union + +# external imports +from cic_types.condiments import MetadataPointer + +# local imports +from cic_ussd.cache import cache_data_key, get_cached_data +from cic_ussd.error import MaxRetryReached + + +logg = logging.getLogger() + + +# adapted from https://github.com/justiniso/polling/blob/master/polling.py +# opted not to use the package to reduce dependency +def poller(args: Optional[Tuple], + interval: int, + kwargs: Optional[Dict], + max_retry: int, + target: Callable[..., Union[Dict, str]]): + """""" + collected_values: list = [] + expected_value = None + tries = 0 + + while True: + if tries >= max_retry: + raise MaxRetryReached(collected_values, expected_value) + try: + if args: + value = target(*args) + elif kwargs: + value = target(**kwargs) + else: + value = target() + expected_value = value + except () as error: + expected_value = error + else: + if bool(value) or value == {}: + logg.debug(f'Resource: {expected_value} now available.') + break + collected_values.append(expected_value) + logg.debug(f'Collected values are: {collected_values}') + tries += 1 + time.sleep(interval) + + +def wait_for_cache(identifier: Union[list, bytes], + resource_name: str, + salt: MetadataPointer, + interval: int = 1, + max_retry: int = 5): + """ + :param identifier: + :type identifier: + :param interval: + :type interval: + :param resource_name: + :type resource_name: + :param salt: + :type salt: + :param max_retry: + :type max_retry: + :return: + :rtype: + """ + key: str = cache_data_key(identifier=identifier, salt=salt) + logg.debug(f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.') + poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data) + + +def wait_for_session_data(resource_name: str, + session_data_key: str, + ussd_session: dict, + interval: int = 1, + max_retry: int = 5): + """ + :param interval: + :type interval: + :param resource_name: + :type resource_name: + :param session_data_key: + :type session_data_key: + :param ussd_session: + :type ussd_session: + :param max_retry: + :type max_retry: + :return: + :rtype: + """ + # poll for data element first + logg.debug(f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.') + poller(args=('data',), interval=interval, kwargs=None, max_retry=max_retry, target=ussd_session.get) + + # poll for session data element + get_session_data = ussd_session.get('data').get + logg.debug(f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.') + poller(args=(session_data_key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_session_data) + diff --git a/apps/cic-ussd/cic_ussd/processor/util.py b/apps/cic-ussd/cic_ussd/processor/util.py index 28c50c63..37690baa 100644 --- a/apps/cic-ussd/cic_ussd/processor/util.py +++ b/apps/cic-ussd/cic_ussd/processor/util.py @@ -102,77 +102,3 @@ def ussd_menu_list(fallback: str, menu_list: list, split: int = 3) -> List[str]: except IndexError: menu_list_reprs.append(fallback) return menu_list_reprs - - -def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5): - """ - :param identifier: - :type identifier: - :param interval: - :type interval: - :param resource_name: - :type resource_name: - :param salt: - :type salt: - :param max_retry: - :type max_retry: - :return: - :rtype: - """ - key = cache_data_key(identifier=identifier, salt=salt) - resource = get_cached_data(key) - counter = 0 - while resource is None: - logg.debug(f'Waiting for: {resource_name} at: {key}. Checking after: {interval} ...') - time.sleep(interval) - counter += 1 - resource = get_cached_data(key) - if resource is not None: - logg.debug(f'{resource_name} now available.') - break - else: - if counter == max_retry: - logg.debug(f'Could not find: {resource_name} within: {max_retry}') - break - - -def wait_for_session_data(resource_name: str, session_data_key: str, ussd_session: dict, interval: int = 1, max_retry: int = 5): - """ - :param interval: - :type interval: - :param resource_name: - :type resource_name: - :param session_data_key: - :type session_data_key: - :param ussd_session: - :type ussd_session: - :param max_retry: - :type max_retry: - :return: - :rtype: - """ - data = ussd_session.get('data') - data_poller = 0 - while not data: - logg.debug(f'Waiting for data object on ussd session: {ussd_session.get("external_session_id")}') - logg.debug(f'Data poller at: {data_poller}. Checking again after: {interval} secs...') - time.sleep(interval) - data_poller += 1 - if data: - logg.debug(f'Data object found, proceeding to poll for: {session_data_key}') - break - if data: - session_data_poller = 0 - session_data = data.get(session_data_key) - while not session_data_key: - logg.debug( - f'Session data poller at: {data_poller} with max retry at: {max_retry}. Checking again after: {interval} secs...') - time.sleep(interval) - session_data_poller += 1 - - if session_data: - logg.debug(f'{resource_name} now available.') - break - - elif session_data_poller >= max_retry: - logg.debug(f'Could not find data object within: {max_retry}') diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/language.py b/apps/cic-ussd/cic_ussd/state_machine/logic/language.py index 0fb66ec1..d1d484ba 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/language.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/language.py @@ -11,7 +11,7 @@ from sqlalchemy.orm.session import Session # local imports from cic_ussd.cache import cache_data_key, get_cached_data from cic_ussd.db.models.account import Account -from cic_ussd.processor.util import wait_for_cache, wait_for_session_data +from cic_ussd.processor.poller import wait_for_cache, wait_for_session_data from cic_ussd.session.ussd_session import save_session_data from cic_ussd.translation import Languages diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py b/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py index b00cea0a..2be9fd99 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py @@ -15,7 +15,7 @@ from cic_ussd.db.models.account import Account from cic_ussd.db.models.base import SessionBase from cic_ussd.db.enum import AccountStatus from cic_ussd.encoder import create_password_hash, check_password_hash -from cic_ussd.processor.util import wait_for_session_data +from cic_ussd.processor.poller import wait_for_session_data from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/tokens.py b/apps/cic-ussd/cic_ussd/state_machine/logic/tokens.py index f8887813..6c0ec419 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/tokens.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/tokens.py @@ -7,7 +7,7 @@ from sqlalchemy.orm.session import Session # local imports from cic_ussd.account.tokens import set_active_token from cic_ussd.db.models.account import Account -from cic_ussd.processor.util import wait_for_session_data +from cic_ussd.processor.poller import wait_for_session_data from cic_ussd.session.ussd_session import save_session_data diff --git a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py index 827e1b46..a3363048 100644 --- a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py +++ b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py @@ -15,7 +15,7 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data from cic_ussd.account.chain import Chain from cic_ussd.db.models.base import SessionBase from cic_ussd.db.models.account import Account -from cic_ussd.processor.util import wait_for_cache +from cic_ussd.processor.poller import wait_for_cache from cic_ussd.account.statement import filter_statement_transactions from cic_ussd.account.transaction import transaction_actors from cic_ussd.account.tokens import (collate_token_metadata, diff --git a/apps/cic-ussd/tests/cic_ussd/processor/test_poller.py b/apps/cic-ussd/tests/cic_ussd/processor/test_poller.py new file mode 100644 index 00000000..c1d86f53 --- /dev/null +++ b/apps/cic-ussd/tests/cic_ussd/processor/test_poller.py @@ -0,0 +1,69 @@ +# standard imports +import logging +import time +from queue import Queue + +# external imports +import pytest +from cic_types.condiments import MetadataPointer + +# local imports +from cic_ussd.cache import cache_data, cache_data_key, get_cached_data +from cic_ussd.error import MaxRetryReached +from cic_ussd.processor.poller import poller, wait_for_cache, wait_for_session_data + +# test imports + + +def test_poller(activated_account, caplog, init_cache, token_symbol): + caplog.set_level(logging.DEBUG) + identifier = bytes.fromhex(activated_account.blockchain_address) + key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE) + with pytest.raises(MaxRetryReached) as error: + interval = 1 + max_retry = 3 + collected_values = [None, None, None] + poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data) + assert str(error.value) == str(MaxRetryReached(collected_values, None)) + cache_data(key, token_symbol) + poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data) + assert f'Resource: {token_symbol} now available.' in caplog.text + + +def test_wait_for_cache(activated_account, caplog, init_cache, token_symbol): + caplog.set_level(logging.DEBUG) + identifier = bytes.fromhex(activated_account.blockchain_address) + key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE) + cache_data(key, token_symbol) + interval = 1 + max_retry = 3 + resource_name = 'Active Token' + wait_for_cache(identifier, resource_name, MetadataPointer.TOKEN_ACTIVE, interval, max_retry) + assert f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.' in caplog.text + + +def test_wait_for_session_data(activated_account, caplog, generic_ussd_session): + caplog.set_level(logging.DEBUG) + generic_ussd_session.__delitem__('data') + interval = 1 + max_retry = 3 + collected_values = [None, None, None] + resource_name = 'Foo Data' + session_data_key = 'foo' + with pytest.raises(MaxRetryReached) as error: + wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry) + assert str(error.value) == str(MaxRetryReached(collected_values, None)) + assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text + generic_ussd_session['data'] = {} + with pytest.raises(MaxRetryReached) as error: + collected_values = [None, None, None] + wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry) + assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text + assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text + assert str(error.value) == str(MaxRetryReached(collected_values, None)) + expected_value = 'bar' + generic_ussd_session['data'] = {'foo': expected_value} + wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry) + assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text + assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text + assert f'Resource: {expected_value} now available.' in caplog.text