Merge branch 'philip/ussd-poler' into 'master'
Philip/ussd poler See merge request grassrootseconomics/cic-internal-integration!326
This commit is contained in:
commit
e69801ea08
@ -14,7 +14,7 @@ from cic_ussd.account.chain import Chain
|
|||||||
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
|
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
|
||||||
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
|
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
|
||||||
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
|
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
|
||||||
from cic_ussd.processor.util import wait_for_cache
|
from cic_ussd.processor.poller import wait_for_cache
|
||||||
|
|
||||||
logg = logging.getLogger(__file__)
|
logg = logging.getLogger(__file__)
|
||||||
|
|
||||||
|
@ -52,4 +52,5 @@ class UnknownUssdRecipient(Exception):
|
|||||||
"""Raised when a recipient of a transaction is not known to the ussd application."""
|
"""Raised when a recipient of a transaction is not known to the ussd application."""
|
||||||
|
|
||||||
|
|
||||||
|
class MaxRetryReached(Exception):
|
||||||
|
"""Raised when the maximum number of retries defined for polling for the availability of a resource."""
|
||||||
|
@ -31,7 +31,8 @@ from cic_ussd.cache import cache_data_key, cache_data, get_cached_data
|
|||||||
from cic_ussd.db.models.account import Account
|
from cic_ussd.db.models.account import Account
|
||||||
from cic_ussd.metadata import PersonMetadata
|
from cic_ussd.metadata import PersonMetadata
|
||||||
from cic_ussd.phone_number import Support
|
from cic_ussd.phone_number import Support
|
||||||
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list, wait_for_session_data
|
from cic_ussd.processor.poller import wait_for_session_data
|
||||||
|
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list
|
||||||
from cic_ussd.session.ussd_session import save_session_data
|
from cic_ussd.session.ussd_session import save_session_data
|
||||||
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
|
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
|
||||||
from cic_ussd.translation import translation_for
|
from cic_ussd.translation import translation_for
|
||||||
|
104
apps/cic-ussd/cic_ussd/processor/poller.py
Normal file
104
apps/cic-ussd/cic_ussd/processor/poller.py
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from queue import Queue
|
||||||
|
from typing import Callable, Dict, Optional, Tuple, Union
|
||||||
|
|
||||||
|
# external imports
|
||||||
|
from cic_types.condiments import MetadataPointer
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_ussd.cache import cache_data_key, get_cached_data
|
||||||
|
from cic_ussd.error import MaxRetryReached
|
||||||
|
|
||||||
|
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
# adapted from https://github.com/justiniso/polling/blob/master/polling.py
|
||||||
|
# opted not to use the package to reduce dependency
|
||||||
|
def poller(args: Optional[Tuple],
|
||||||
|
interval: int,
|
||||||
|
kwargs: Optional[Dict],
|
||||||
|
max_retry: int,
|
||||||
|
target: Callable[..., Union[Dict, str]]):
|
||||||
|
""""""
|
||||||
|
collected_values: list = []
|
||||||
|
expected_value = None
|
||||||
|
tries = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if tries >= max_retry:
|
||||||
|
raise MaxRetryReached(collected_values, expected_value)
|
||||||
|
try:
|
||||||
|
if args:
|
||||||
|
value = target(*args)
|
||||||
|
elif kwargs:
|
||||||
|
value = target(**kwargs)
|
||||||
|
else:
|
||||||
|
value = target()
|
||||||
|
expected_value = value
|
||||||
|
except () as error:
|
||||||
|
expected_value = error
|
||||||
|
else:
|
||||||
|
if bool(value) or value == {}:
|
||||||
|
logg.debug(f'Resource: {expected_value} now available.')
|
||||||
|
break
|
||||||
|
collected_values.append(expected_value)
|
||||||
|
logg.debug(f'Collected values are: {collected_values}')
|
||||||
|
tries += 1
|
||||||
|
time.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_cache(identifier: Union[list, bytes],
|
||||||
|
resource_name: str,
|
||||||
|
salt: MetadataPointer,
|
||||||
|
interval: int = 1,
|
||||||
|
max_retry: int = 5):
|
||||||
|
"""
|
||||||
|
:param identifier:
|
||||||
|
:type identifier:
|
||||||
|
:param interval:
|
||||||
|
:type interval:
|
||||||
|
:param resource_name:
|
||||||
|
:type resource_name:
|
||||||
|
:param salt:
|
||||||
|
:type salt:
|
||||||
|
:param max_retry:
|
||||||
|
:type max_retry:
|
||||||
|
:return:
|
||||||
|
:rtype:
|
||||||
|
"""
|
||||||
|
key: str = cache_data_key(identifier=identifier, salt=salt)
|
||||||
|
logg.debug(f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.')
|
||||||
|
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_session_data(resource_name: str,
|
||||||
|
session_data_key: str,
|
||||||
|
ussd_session: dict,
|
||||||
|
interval: int = 1,
|
||||||
|
max_retry: int = 5):
|
||||||
|
"""
|
||||||
|
:param interval:
|
||||||
|
:type interval:
|
||||||
|
:param resource_name:
|
||||||
|
:type resource_name:
|
||||||
|
:param session_data_key:
|
||||||
|
:type session_data_key:
|
||||||
|
:param ussd_session:
|
||||||
|
:type ussd_session:
|
||||||
|
:param max_retry:
|
||||||
|
:type max_retry:
|
||||||
|
:return:
|
||||||
|
:rtype:
|
||||||
|
"""
|
||||||
|
# poll for data element first
|
||||||
|
logg.debug(f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.')
|
||||||
|
poller(args=('data',), interval=interval, kwargs=None, max_retry=max_retry, target=ussd_session.get)
|
||||||
|
|
||||||
|
# poll for session data element
|
||||||
|
get_session_data = ussd_session.get('data').get
|
||||||
|
logg.debug(f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.')
|
||||||
|
poller(args=(session_data_key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_session_data)
|
||||||
|
|
@ -102,77 +102,3 @@ def ussd_menu_list(fallback: str, menu_list: list, split: int = 3) -> List[str]:
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
menu_list_reprs.append(fallback)
|
menu_list_reprs.append(fallback)
|
||||||
return menu_list_reprs
|
return menu_list_reprs
|
||||||
|
|
||||||
|
|
||||||
def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5):
|
|
||||||
"""
|
|
||||||
:param identifier:
|
|
||||||
:type identifier:
|
|
||||||
:param interval:
|
|
||||||
:type interval:
|
|
||||||
:param resource_name:
|
|
||||||
:type resource_name:
|
|
||||||
:param salt:
|
|
||||||
:type salt:
|
|
||||||
:param max_retry:
|
|
||||||
:type max_retry:
|
|
||||||
:return:
|
|
||||||
:rtype:
|
|
||||||
"""
|
|
||||||
key = cache_data_key(identifier=identifier, salt=salt)
|
|
||||||
resource = get_cached_data(key)
|
|
||||||
counter = 0
|
|
||||||
while resource is None:
|
|
||||||
logg.debug(f'Waiting for: {resource_name} at: {key}. Checking after: {interval} ...')
|
|
||||||
time.sleep(interval)
|
|
||||||
counter += 1
|
|
||||||
resource = get_cached_data(key)
|
|
||||||
if resource is not None:
|
|
||||||
logg.debug(f'{resource_name} now available.')
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if counter == max_retry:
|
|
||||||
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for_session_data(resource_name: str, session_data_key: str, ussd_session: dict, interval: int = 1, max_retry: int = 5):
|
|
||||||
"""
|
|
||||||
:param interval:
|
|
||||||
:type interval:
|
|
||||||
:param resource_name:
|
|
||||||
:type resource_name:
|
|
||||||
:param session_data_key:
|
|
||||||
:type session_data_key:
|
|
||||||
:param ussd_session:
|
|
||||||
:type ussd_session:
|
|
||||||
:param max_retry:
|
|
||||||
:type max_retry:
|
|
||||||
:return:
|
|
||||||
:rtype:
|
|
||||||
"""
|
|
||||||
data = ussd_session.get('data')
|
|
||||||
data_poller = 0
|
|
||||||
while not data:
|
|
||||||
logg.debug(f'Waiting for data object on ussd session: {ussd_session.get("external_session_id")}')
|
|
||||||
logg.debug(f'Data poller at: {data_poller}. Checking again after: {interval} secs...')
|
|
||||||
time.sleep(interval)
|
|
||||||
data_poller += 1
|
|
||||||
if data:
|
|
||||||
logg.debug(f'Data object found, proceeding to poll for: {session_data_key}')
|
|
||||||
break
|
|
||||||
if data:
|
|
||||||
session_data_poller = 0
|
|
||||||
session_data = data.get(session_data_key)
|
|
||||||
while not session_data_key:
|
|
||||||
logg.debug(
|
|
||||||
f'Session data poller at: {data_poller} with max retry at: {max_retry}. Checking again after: {interval} secs...')
|
|
||||||
time.sleep(interval)
|
|
||||||
session_data_poller += 1
|
|
||||||
|
|
||||||
if session_data:
|
|
||||||
logg.debug(f'{resource_name} now available.')
|
|
||||||
break
|
|
||||||
|
|
||||||
elif session_data_poller >= max_retry:
|
|
||||||
logg.debug(f'Could not find data object within: {max_retry}')
|
|
||||||
|
@ -11,7 +11,7 @@ from sqlalchemy.orm.session import Session
|
|||||||
# local imports
|
# local imports
|
||||||
from cic_ussd.cache import cache_data_key, get_cached_data
|
from cic_ussd.cache import cache_data_key, get_cached_data
|
||||||
from cic_ussd.db.models.account import Account
|
from cic_ussd.db.models.account import Account
|
||||||
from cic_ussd.processor.util import wait_for_cache, wait_for_session_data
|
from cic_ussd.processor.poller import wait_for_cache, wait_for_session_data
|
||||||
from cic_ussd.session.ussd_session import save_session_data
|
from cic_ussd.session.ussd_session import save_session_data
|
||||||
from cic_ussd.translation import Languages
|
from cic_ussd.translation import Languages
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ from cic_ussd.db.models.account import Account
|
|||||||
from cic_ussd.db.models.base import SessionBase
|
from cic_ussd.db.models.base import SessionBase
|
||||||
from cic_ussd.db.enum import AccountStatus
|
from cic_ussd.db.enum import AccountStatus
|
||||||
from cic_ussd.encoder import create_password_hash, check_password_hash
|
from cic_ussd.encoder import create_password_hash, check_password_hash
|
||||||
from cic_ussd.processor.util import wait_for_session_data
|
from cic_ussd.processor.poller import wait_for_session_data
|
||||||
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session
|
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ from sqlalchemy.orm.session import Session
|
|||||||
# local imports
|
# local imports
|
||||||
from cic_ussd.account.tokens import set_active_token
|
from cic_ussd.account.tokens import set_active_token
|
||||||
from cic_ussd.db.models.account import Account
|
from cic_ussd.db.models.account import Account
|
||||||
from cic_ussd.processor.util import wait_for_session_data
|
from cic_ussd.processor.poller import wait_for_session_data
|
||||||
from cic_ussd.session.ussd_session import save_session_data
|
from cic_ussd.session.ussd_session import save_session_data
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data
|
|||||||
from cic_ussd.account.chain import Chain
|
from cic_ussd.account.chain import Chain
|
||||||
from cic_ussd.db.models.base import SessionBase
|
from cic_ussd.db.models.base import SessionBase
|
||||||
from cic_ussd.db.models.account import Account
|
from cic_ussd.db.models.account import Account
|
||||||
from cic_ussd.processor.util import wait_for_cache
|
from cic_ussd.processor.poller import wait_for_cache
|
||||||
from cic_ussd.account.statement import filter_statement_transactions
|
from cic_ussd.account.statement import filter_statement_transactions
|
||||||
from cic_ussd.account.transaction import transaction_actors
|
from cic_ussd.account.transaction import transaction_actors
|
||||||
from cic_ussd.account.tokens import (collate_token_metadata,
|
from cic_ussd.account.tokens import (collate_token_metadata,
|
||||||
|
69
apps/cic-ussd/tests/cic_ussd/processor/test_poller.py
Normal file
69
apps/cic-ussd/tests/cic_ussd/processor/test_poller.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from queue import Queue
|
||||||
|
|
||||||
|
# external imports
|
||||||
|
import pytest
|
||||||
|
from cic_types.condiments import MetadataPointer
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
|
||||||
|
from cic_ussd.error import MaxRetryReached
|
||||||
|
from cic_ussd.processor.poller import poller, wait_for_cache, wait_for_session_data
|
||||||
|
|
||||||
|
# test imports
|
||||||
|
|
||||||
|
|
||||||
|
def test_poller(activated_account, caplog, init_cache, token_symbol):
|
||||||
|
caplog.set_level(logging.DEBUG)
|
||||||
|
identifier = bytes.fromhex(activated_account.blockchain_address)
|
||||||
|
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
|
||||||
|
with pytest.raises(MaxRetryReached) as error:
|
||||||
|
interval = 1
|
||||||
|
max_retry = 3
|
||||||
|
collected_values = [None, None, None]
|
||||||
|
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
|
||||||
|
assert str(error.value) == str(MaxRetryReached(collected_values, None))
|
||||||
|
cache_data(key, token_symbol)
|
||||||
|
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
|
||||||
|
assert f'Resource: {token_symbol} now available.' in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_for_cache(activated_account, caplog, init_cache, token_symbol):
|
||||||
|
caplog.set_level(logging.DEBUG)
|
||||||
|
identifier = bytes.fromhex(activated_account.blockchain_address)
|
||||||
|
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
|
||||||
|
cache_data(key, token_symbol)
|
||||||
|
interval = 1
|
||||||
|
max_retry = 3
|
||||||
|
resource_name = 'Active Token'
|
||||||
|
wait_for_cache(identifier, resource_name, MetadataPointer.TOKEN_ACTIVE, interval, max_retry)
|
||||||
|
assert f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.' in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_for_session_data(activated_account, caplog, generic_ussd_session):
|
||||||
|
caplog.set_level(logging.DEBUG)
|
||||||
|
generic_ussd_session.__delitem__('data')
|
||||||
|
interval = 1
|
||||||
|
max_retry = 3
|
||||||
|
collected_values = [None, None, None]
|
||||||
|
resource_name = 'Foo Data'
|
||||||
|
session_data_key = 'foo'
|
||||||
|
with pytest.raises(MaxRetryReached) as error:
|
||||||
|
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
|
||||||
|
assert str(error.value) == str(MaxRetryReached(collected_values, None))
|
||||||
|
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
|
||||||
|
generic_ussd_session['data'] = {}
|
||||||
|
with pytest.raises(MaxRetryReached) as error:
|
||||||
|
collected_values = [None, None, None]
|
||||||
|
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
|
||||||
|
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
|
||||||
|
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
|
||||||
|
assert str(error.value) == str(MaxRetryReached(collected_values, None))
|
||||||
|
expected_value = 'bar'
|
||||||
|
generic_ussd_session['data'] = {'foo': expected_value}
|
||||||
|
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
|
||||||
|
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
|
||||||
|
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
|
||||||
|
assert f'Resource: {expected_value} now available.' in caplog.text
|
Loading…
Reference in New Issue
Block a user