Adds poller.

This commit is contained in:
Philip Wafula 2022-01-07 13:16:32 +03:00
parent 5d1a30021a
commit 06ea97ee81
Signed by untrusted user: mango-habanero
GPG Key ID: B00CE9034DA19FB7
2 changed files with 173 additions and 0 deletions

View File

@ -0,0 +1,104 @@
# standard imports
import logging
import time
from queue import Queue
from typing import Callable, Dict, Optional, Tuple, Union
# external imports
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import MaxRetryReached
logg = logging.getLogger()
# adapted from https://github.com/justiniso/polling/blob/master/polling.py
# opted not to use the package to reduce dependency
def poller(args: Optional[Tuple],
interval: int,
kwargs: Optional[Dict],
max_retry: int,
target: Callable[..., Union[Dict, str]]):
""""""
collected_values: list = []
expected_value = None
tries = 0
while True:
if tries >= max_retry:
raise MaxRetryReached(collected_values, expected_value)
try:
if args:
value = target(*args)
elif kwargs:
value = target(**kwargs)
else:
value = target()
expected_value = value
except () as error:
expected_value = error
else:
if bool(value) or value == {}:
logg.debug(f'Resource: {expected_value} now available.')
break
collected_values.append(expected_value)
logg.debug(f'Collected values are: {collected_values}')
tries += 1
time.sleep(interval)
def wait_for_cache(identifier: Union[list, bytes],
resource_name: str,
salt: MetadataPointer,
interval: int = 1,
max_retry: int = 5):
"""
:param identifier:
:type identifier:
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param salt:
:type salt:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
key: str = cache_data_key(identifier=identifier, salt=salt)
logg.debug(f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.')
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
def wait_for_session_data(resource_name: str,
session_data_key: str,
ussd_session: dict,
interval: int = 1,
max_retry: int = 5):
"""
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param session_data_key:
:type session_data_key:
:param ussd_session:
:type ussd_session:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
# poll for data element first
logg.debug(f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.')
poller(args=('data',), interval=interval, kwargs=None, max_retry=max_retry, target=ussd_session.get)
# poll for session data element
get_session_data = ussd_session.get('data').get
logg.debug(f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.')
poller(args=(session_data_key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_session_data)

View File

@ -0,0 +1,69 @@
# standard imports
import logging
import time
from queue import Queue
# external imports
import pytest
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.error import MaxRetryReached
from cic_ussd.processor.poller import poller, wait_for_cache, wait_for_session_data
# test imports
def test_poller(activated_account, caplog, init_cache, token_symbol):
caplog.set_level(logging.DEBUG)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
with pytest.raises(MaxRetryReached) as error:
interval = 1
max_retry = 3
collected_values = [None, None, None]
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
assert str(error.value) == str(MaxRetryReached(collected_values, None))
cache_data(key, token_symbol)
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
assert f'Resource: {token_symbol} now available.' in caplog.text
def test_wait_for_cache(activated_account, caplog, init_cache, token_symbol):
caplog.set_level(logging.DEBUG)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
cache_data(key, token_symbol)
interval = 1
max_retry = 3
resource_name = 'Active Token'
wait_for_cache(identifier, resource_name, MetadataPointer.TOKEN_ACTIVE, interval, max_retry)
assert f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.' in caplog.text
def test_wait_for_session_data(activated_account, caplog, generic_ussd_session):
caplog.set_level(logging.DEBUG)
generic_ussd_session.__delitem__('data')
interval = 1
max_retry = 3
collected_values = [None, None, None]
resource_name = 'Foo Data'
session_data_key = 'foo'
with pytest.raises(MaxRetryReached) as error:
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert str(error.value) == str(MaxRetryReached(collected_values, None))
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
generic_ussd_session['data'] = {}
with pytest.raises(MaxRetryReached) as error:
collected_values = [None, None, None]
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert str(error.value) == str(MaxRetryReached(collected_values, None))
expected_value = 'bar'
generic_ussd_session['data'] = {'foo': expected_value}
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Resource: {expected_value} now available.' in caplog.text