The great bump

This commit is contained in:
2021-08-06 16:29:01 +00:00
parent f764b73f66
commit 0672a17d2e
195 changed files with 5791 additions and 4983 deletions

View File

@@ -0,0 +1,68 @@
# standard imports
# external imports
import pytest
# local imports
from cic_ussd.account.balance import calculate_available_balance, get_balances, get_cached_available_balance
from cic_ussd.account.chain import Chain
from cic_ussd.error import CachedDataNotFoundError
# test imports
from tests.helpers.accounts import blockchain_address
def test_async_get_balances(activated_account,
celery_session_worker,
load_chain_spec,
load_config,
mock_async_balance_api_query):
blockchain_address = activated_account.blockchain_address
chain_str = Chain.spec.__str__()
token_symbol = load_config.get('TEST_TOKEN_SYMBOL')
get_balances(blockchain_address, chain_str, token_symbol, asynchronous=True)
assert mock_async_balance_api_query.get('address') == blockchain_address
assert mock_async_balance_api_query.get('token_symbol') == token_symbol
def test_sync_get_balances(activated_account,
balances,
celery_session_worker,
load_chain_spec,
load_config,
mock_sync_balance_api_query):
blockchain_address = activated_account.blockchain_address
chain_str = Chain.spec.__str__()
token_symbol = load_config.get('TEST_TOKEN_SYMBOL')
res = get_balances(blockchain_address, chain_str, token_symbol, asynchronous=False)
assert res == balances
@pytest.mark.parametrize('balance_incoming, balance_network, balance_outgoing, available_balance', [
(0, 50000000, 0, 50.00),
(5000000, 89000000, 67000000, 27.00)
])
def test_calculate_available_balance(activated_account,
balance_incoming,
balance_network,
balance_outgoing,
available_balance):
balances = {
'address': activated_account.blockchain_address,
'converters': [],
'balance_network': balance_network,
'balance_outgoing': balance_outgoing,
'balance_incoming': balance_incoming
}
assert calculate_available_balance(balances) == available_balance
def test_get_cached_available_balance(activated_account, cache_balances, balances):
cached_available_balance = get_cached_available_balance(activated_account.blockchain_address)
available_balance = calculate_available_balance(balances[0])
assert cached_available_balance == available_balance
address = blockchain_address()
with pytest.raises(CachedDataNotFoundError) as error:
cached_available_balance = get_cached_available_balance(address)
assert cached_available_balance is None
assert str(error.value) == f'No cached available balance for address: {address}'

View File

@@ -0,0 +1,28 @@
# standard imports
# external imports
import pytest
# local imports
from cic_ussd.account.maps import gender, language
# test imports
@pytest.mark.parametrize('key, expected_value', [
('1', 'male'),
('2', 'female'),
('3', 'other')
])
def test_gender(key, expected_value):
g_map = gender()
assert g_map[key] == expected_value
@pytest.mark.parametrize('key, expected_value', [
('1', 'en'),
('2', 'sw'),
])
def test_language(key, expected_value):
l_map = language()
assert l_map[key] == expected_value

View File

@@ -0,0 +1,28 @@
# standard imports
import json
# external imports
from cic_types.models.person import get_contact_data_from_vcard
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language, parse_account_metadata
# test imports
from tests.helpers.accounts import blockchain_address
def test_get_cached_preferred_language(activated_account, cache_preferences, preferences):
cached_preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
assert cached_preferred_language == preferences.get('preferred_language')
cached_preferred_language = get_cached_preferred_language(blockchain_address())
assert cached_preferred_language is None
def test_parse_account_metadata(person_metadata):
contact_information = get_contact_data_from_vcard(person_metadata.get('vcard'))
given_name = contact_information.get('given')
family_name = contact_information.get('family')
phone_number = contact_information.get('tel')
parsed_account_metadata = f'{given_name} {family_name} {phone_number}'
assert parse_account_metadata(person_metadata) == parsed_account_metadata

View File

@@ -0,0 +1,85 @@
# standard imports
import json
import time
# external imports
import pytest
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.statement import (filter_statement_transactions,
generate,
get_cached_statement,
parse_statement_transactions,
query_statement,
statement_transaction_set)
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.cache import cache_data_key, get_cached_data
# test imports
from tests.helpers.accounts import blockchain_address
def test_filter_statement_transactions(transactions_list):
assert len(transactions_list) == 4
assert len(filter_statement_transactions(transactions_list)) == 1
def test_generate(activated_account,
cache_preferences,
celery_session_worker,
init_cache,
init_database,
set_locale_files,
preferences,
preferences_metadata_url,
transactions_list):
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('GET', preferences_metadata_url, status_code=200, reason='OK', json=preferences)
statement_transactions = filter_statement_transactions(transactions_list)
for transaction in statement_transactions:
querying_party = activated_account.blockchain_address
recipient_transaction, sender_transaction = transaction_actors(transaction)
if recipient_transaction.get('blockchain_address') == querying_party:
generate(querying_party, None, recipient_transaction)
if sender_transaction.get('blockchain_address') == querying_party:
generate(querying_party, None, sender_transaction)
time.sleep(2)
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
key = cache_data_key(identifier, ':cic.statement')
statement = get_cached_data(key)
statement = json.loads(statement)
assert len(statement) == 1
def test_get_cached_statement(activated_account, cache_statement, statement):
cached_statement = get_cached_statement(activated_account.blockchain_address)
assert cached_statement is not None
cached_statement = json.loads(cached_statement)
assert cached_statement[0].get('blockchain_address') == statement[0].get('blockchain_address')
def test_parse_statement_transactions(statement):
parsed_transactions = parse_statement_transactions(statement)
parsed_transaction = parsed_transactions[0]
parsed_transaction.startswith('Sent')
@pytest.mark.parametrize('blockchain_address, limit', [
(blockchain_address(), 10),
(blockchain_address(), 5)
])
def test_query_statement(blockchain_address, limit, load_chain_spec, activated_account, mock_transaction_list_query):
query_statement(blockchain_address, limit)
assert mock_transaction_list_query.get('address') == blockchain_address
assert mock_transaction_list_query.get('limit') == limit
def test_statement_transaction_set(preferences, set_locale_files, statement):
parsed_transactions = parse_statement_transactions(statement)
preferred_language = preferences.get('preferred_language')
transaction_set = statement_transaction_set(preferred_language, parsed_transactions)
transaction_set.startswith('Sent')
transaction_set = statement_transaction_set(preferred_language, [])
transaction_set.startswith('No')

View File

@@ -0,0 +1,42 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_default_token, get_default_token_symbol, query_default_token
# test imports
def test_get_cached_default_token(cache_default_token_data, default_token_data, load_chain_spec):
chain_str = Chain.spec.__str__()
cached_default_token = get_cached_default_token(chain_str)
cached_default_token_data = json.loads(cached_default_token)
assert cached_default_token_data['symbol'] == default_token_data['symbol']
assert cached_default_token_data['address'] == default_token_data['address']
assert cached_default_token_data['name'] == default_token_data['name']
assert cached_default_token_data['decimals'] == default_token_data['decimals']
def test_get_default_token_symbol_from_api(default_token_data, load_chain_spec, mock_sync_default_token_api_query):
default_token_symbol = get_default_token_symbol()
assert default_token_symbol == default_token_data['symbol']
def test_query_default_token(default_token_data, load_chain_spec, mock_sync_default_token_api_query):
chain_str = Chain.spec.__str__()
queried_default_token_data = query_default_token(chain_str)
assert queried_default_token_data['symbol'] == default_token_data['symbol']
assert queried_default_token_data['address'] == default_token_data['address']
assert queried_default_token_data['name'] == default_token_data['name']
assert queried_default_token_data['decimals'] == default_token_data['decimals']
def test_get_default_token_symbol_from_cache(cache_default_token_data, default_token_data, load_chain_spec):
default_token_symbol = get_default_token_symbol()
assert default_token_symbol is not None
assert default_token_symbol == default_token_data.get('symbol')

View File

@@ -0,0 +1,110 @@
# standard imports
from decimal import Decimal
# external imports
import pytest
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.transaction import (aux_transaction_data,
from_wei,
to_wei,
truncate,
transaction_actors,
validate_transaction_account,
OutgoingTransaction)
from cic_ussd.db.models.account import Account
from cic_ussd.error import UnknownUssdRecipient
from cic_ussd.translation import translation_for
# test imports
def check_aux_data(action_tag_key, direction_tag_key, preferred_language, transaction_with_aux_data):
assert transaction_with_aux_data.get('action_tag') == translation_for(action_tag_key, preferred_language)
assert transaction_with_aux_data.get('direction_tag') == translation_for(direction_tag_key, preferred_language)
def test_aux_transaction_data(preferences, set_locale_files, transactions_list):
sample_transaction = transactions_list[0]
preferred_language = preferences.get('preferred_language')
recipient_transaction, sender_transaction = transaction_actors(sample_transaction)
recipient_tx_aux_data = aux_transaction_data(preferred_language, recipient_transaction)
check_aux_data('helpers.received', 'helpers.from', preferred_language, recipient_tx_aux_data)
sender_tx_aux_data = aux_transaction_data(preferred_language, sender_transaction)
check_aux_data('helpers.sent', 'helpers.to', preferred_language, sender_tx_aux_data)
@pytest.mark.parametrize("wei, expected_result", [
(50000000, Decimal('50.00')),
(100000, Decimal('0.10'))
])
def test_from_wei(wei, expected_result):
assert from_wei(wei) == expected_result
@pytest.mark.parametrize("value, expected_result", [
(50, 50000000),
(0.10, 100000)
])
def test_to_wei(value, expected_result):
assert to_wei(value) == expected_result
@pytest.mark.parametrize("decimals, value, expected_result", [
(3, 1234.32875, 1234.328),
(2, 98.998, 98.99)
])
def test_truncate(decimals, value, expected_result):
assert truncate(value=value, decimals=decimals).__float__() == expected_result
def test_transaction_actors(activated_account, transaction_result, valid_recipient):
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
assert recipient_transaction.get('blockchain_address') == valid_recipient.blockchain_address
assert sender_transaction.get('blockchain_address') == activated_account.blockchain_address
assert recipient_transaction.get('role') == 'recipient'
assert sender_transaction.get('role') == 'sender'
assert recipient_transaction.get('token_symbol') == transaction_result.get('destination_token_symbol')
assert sender_transaction.get('token_symbol') == transaction_result.get('source_token_symbol')
assert recipient_transaction.get('token_value') == transaction_result.get('destination_token_value')
assert sender_transaction.get('token_value') == transaction_result.get('source_token_value')
def test_validate_transaction_account(activated_account, init_database, transactions_list):
sample_transaction = transactions_list[0]
recipient_transaction, sender_transaction = transaction_actors(sample_transaction)
recipient_account = validate_transaction_account(init_database, recipient_transaction)
sender_account = validate_transaction_account(init_database, sender_transaction)
assert isinstance(recipient_account, Account)
assert isinstance(sender_account, Account)
sample_transaction = transactions_list[1]
recipient_transaction, sender_transaction = transaction_actors(sample_transaction)
with pytest.raises(UnknownUssdRecipient) as error:
validate_transaction_account(init_database, recipient_transaction)
assert str(
error.value) == f'Tx for recipient: {recipient_transaction.get("blockchain_address")} has no matching account in the system.'
validate_transaction_account(init_database, sender_transaction)
assert f'Tx from sender: {sender_transaction.get("blockchain_address")} has no matching account in system.'
@pytest.mark.parametrize("amount", [50, 0.10])
def test_outgoing_transaction_processor(activated_account,
amount,
celery_session_worker,
load_config,
load_chain_spec,
mock_transfer_api,
valid_recipient):
chain_str = Chain.spec.__str__()
token_symbol = load_config.get('TEST_TOKEN_SYMBOL')
outgoing_tx_processor = OutgoingTransaction(chain_str,
activated_account.blockchain_address,
valid_recipient.blockchain_address)
outgoing_tx_processor.transfer(amount, token_symbol)
assert mock_transfer_api.get('from_address') == activated_account.blockchain_address
assert mock_transfer_api.get('to_address') == valid_recipient.blockchain_address
assert mock_transfer_api.get('value') == to_wei(amount)
assert mock_transfer_api.get('token_symbol') == token_symbol

View File

@@ -0,0 +1,96 @@
# standard imports
import json
# external imports
import pytest
from cic_types.models.person import get_contact_data_from_vcard
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.cache import get_cached_data
from cic_ussd.db.enum import AccountStatus
from cic_ussd.db.models.account import Account, create, cache_creation_task_uuid
from cic_ussd.db.models.task_tracker import TaskTracker
# test imports
from tests.helpers.accounts import blockchain_address, phone_number
def test_account(init_database, set_fernet_key):
address = blockchain_address()
phone = phone_number()
account = Account(address, phone)
account.create_password('0000')
account.activate_account()
init_database.add(account)
init_database.commit()
account = init_database.query(Account).get(1)
assert account.blockchain_address == address
assert account.phone_number == phone
assert account.failed_pin_attempts == 0
assert account.verify_password('0000') is True
assert account.get_status(init_database) == AccountStatus.ACTIVE.name
def test_account_repr(activated_account):
assert repr(activated_account) == f'<Account: {activated_account.blockchain_address}>'
def test_account_statuses(init_database, pending_account):
assert pending_account.get_status(init_database) == AccountStatus.PENDING.name
pending_account.create_password('1111')
pending_account.activate_account()
init_database.add(pending_account)
init_database.commit()
assert pending_account.get_status(init_database) == AccountStatus.ACTIVE.name
pending_account.failed_pin_attempts = 3
assert pending_account.get_status(init_database) == AccountStatus.LOCKED.name
pending_account.reset_pin(init_database)
assert pending_account.get_status(init_database) == AccountStatus.RESET.name
pending_account.activate_account()
assert pending_account.get_status(init_database) == AccountStatus.ACTIVE.name
def test_get_by_phone_number(activated_account, init_database):
account = Account.get_by_phone_number(activated_account.phone_number, init_database)
assert account == activated_account
def test_has_preferred_language(activated_account, cache_preferences):
assert activated_account.has_preferred_language() is True
def test_lacks_preferred_language(activated_account):
assert activated_account.has_preferred_language() is False
def test_has_valid_pin(activated_account, init_database, pending_account):
assert activated_account.has_valid_pin(init_database) is True
assert pending_account.has_valid_pin(init_database) is False
def test_pin_is_blocked(activated_account, init_database):
assert activated_account.pin_is_blocked(init_database) is False
activated_account.failed_pin_attempts = 3
init_database.add(activated_account)
init_database.commit()
assert activated_account.pin_is_blocked(init_database) is True
def test_standard_metadata_id(activated_account, cache_person_metadata, pending_account, person_metadata):
contact_information = get_contact_data_from_vcard(person_metadata.get('vcard'))
given_name = contact_information.get('given')
family_name = contact_information.get('family')
phone_number = contact_information.get('tel')
parsed_account_metadata = f'{given_name} {family_name} {phone_number}'
assert activated_account.standard_metadata_id() == parsed_account_metadata
assert pending_account.standard_metadata_id() == pending_account.phone_number
def test_account_create(init_cache, init_database, load_chain_spec, mock_account_creation_task_result, task_uuid):
chain_str = Chain.spec.__str__()
create(chain_str, phone_number(), init_database)
assert len(init_database.query(TaskTracker).all()) == 1
account_creation_data = get_cached_data(task_uuid)
assert json.loads(account_creation_data).get('status') == AccountStatus.PENDING.name

View File

@@ -6,9 +6,8 @@ def test_task_tracker(init_database):
task_uuid = '31e85315-feee-4b6d-995e-223569082cc4'
task_in_tracker = TaskTracker(task_uuid=task_uuid)
session = init_database
session.add(task_in_tracker)
session.commit()
init_database.add(task_in_tracker)
init_database.commit()
queried_task = session.query(TaskTracker).get(1)
queried_task = init_database.query(TaskTracker).get(1)
assert queried_task.task_uuid == task_uuid

View File

@@ -1,40 +0,0 @@
"""Tests the persistence of the user record and associated functions to the user object"""
# standard imports
import pytest
# platform imports
from cic_ussd.db.models.account import Account
def test_user(init_database, set_fernet_key):
user = Account(blockchain_address='0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51',
phone_number='+254700000000')
user.create_password('0000')
session = Account.session
session.add(user)
session.commit()
queried_user = session.query(Account).get(1)
assert queried_user.blockchain_address == '0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51'
assert queried_user.phone_number == '+254700000000'
assert queried_user.failed_pin_attempts == 0
assert queried_user.verify_password('0000') is True
def test_user_state_transition(create_pending_user):
user = create_pending_user
session = Account.session
assert user.get_account_status() == 'PENDING'
user.activate_account()
assert user.get_account_status() == 'ACTIVE'
user.failed_pin_attempts = 3
assert user.get_account_status() == 'LOCKED'
user.reset_account_pin()
assert user.get_account_status() == 'RESET'
user.activate_account()
assert user.get_account_status() == 'ACTIVE'
session.add(user)
session.commit()

View File

@@ -1,54 +1,78 @@
# standard imports
import os
# third party imports
import pytest
from sqlalchemy import desc
# local imports
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import VersionTooLowError
def test_ussd_session(init_database, create_in_redis_ussd_session, create_activated_user):
session = init_database
def test_ussd_session(activated_account, init_database, init_cache, load_config):
valid_service_codes = load_config.get('USSD_SERVICE_CODE').split(",")
ussd_session = UssdSession(
external_session_id='AT65423',
service_code='*123#',
msisdn=create_activated_user.phone_number,
external_session_id=os.urandom(20).hex(),
service_code=valid_service_codes[0],
msisdn=activated_account.phone_number,
user_input='1',
state='start',
session_data={},
data={},
version=1,
)
session.add(ussd_session)
session.commit()
init_database.add(ussd_session)
init_database.commit()
ussd_session.set_data(key='foo', session=init_database, value='bar')
assert ussd_session.get_data('foo') == 'bar'
ussd_session.update(
session=init_database,
user_input='3',
state='next',
version=2
)
ussd_session.update('3', 'next', 2, init_database)
assert ussd_session.version == 2
session.add(ussd_session)
session.commit()
init_database.add(ussd_session)
init_database.commit()
assert UssdSession.have_session_for_phone(create_activated_user.phone_number) is True
assert UssdSession.has_record_for_phone_number(activated_account.phone_number, init_database) is True
def test_version_too_low_error(init_database, create_in_redis_ussd_session, create_activated_user):
def test_version_too_low_error(activated_account, init_database, init_cache):
with pytest.raises(VersionTooLowError) as e:
session = UssdSession(
external_session_id='AT38745',
service_code='*123#',
msisdn=create_activated_user.phone_number,
msisdn=activated_account.phone_number,
user_input='1',
state='start',
session_data={},
data={},
version=3,
)
assert session.check_version(1)
assert session.check_version(3)
assert str(e.value) == 'New session version number is not greater than last saved version!'
def test_set_data(init_database, ussd_session_data, persisted_ussd_session):
assert persisted_ussd_session.data == {}
for key, value in ussd_session_data.items():
persisted_ussd_session.set_data(key, init_database, value)
init_database.commit()
assert persisted_ussd_session.get_data('recipient') == ussd_session_data.get('recipient')
def test_has_record_for_phone_number(activated_account, init_database, persisted_ussd_session):
ussd_session = UssdSession.has_record_for_phone_number(activated_account.phone_number, init_database)
assert ussd_session is not None
def test_last_ussd_session(init_database, ussd_session_traffic):
assert len(init_database.query(UssdSession).all()) >= 5
ussd_session = init_database.query(UssdSession).order_by(desc(UssdSession.created)).first()
phone_number = ussd_session.msisdn
assert UssdSession.last_ussd_session(phone_number, init_database).id == ussd_session.id
def test_persisted_to_json(persisted_ussd_session):
assert isinstance(persisted_ussd_session, UssdSession)
assert isinstance(persisted_ussd_session.to_json(), dict)

View File

@@ -10,11 +10,6 @@ from cic_ussd.db import dsn_from_config
def test_dsn_from_config(load_config):
"""
"""
# test dsn for sqlite engine
dsn = dsn_from_config(load_config)
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
assert dsn == f'{scheme}:///{load_config.get("DATABASE_NAME")}'
# test dsn for other db formats
overrides = {
'DATABASE_PASSWORD': 'password',
@@ -28,3 +23,16 @@ def test_dsn_from_config(load_config):
dsn = dsn_from_config(load_config)
assert dsn == f"{scheme}://{load_config.get('DATABASE_USER')}:{load_config.get('DATABASE_PASSWORD')}@{load_config.get('DATABASE_HOST')}:{load_config.get('DATABASE_PORT')}/{load_config.get('DATABASE_NAME')}"
# undoes overrides to revert engine and drivers to sqlite
overrides = {
'DATABASE_PASSWORD': '',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_ENGINE': 'sqlite'
}
load_config.dict_override(dct=overrides, dct_description='Override values to test different db formats.')
# test dsn for sqlite engine
dsn = dsn_from_config(load_config)
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
assert dsn == f'{scheme}:///{load_config.get("DATABASE_NAME")}'

View File

@@ -36,7 +36,7 @@ def test_json_file_parser(load_config):
WHEN the json_file_parser function is passed a directory path containing JSON files
THEN it dynamically loads all the files and compiles their content into one python array
"""
files_dir = load_config.get('STATEMACHINE_TRANSITIONS')
files_dir = load_config.get('MACHINE_TRANSITIONS')
files_dir = os.path.join(root_directory, files_dir)
# total files len
@@ -46,10 +46,7 @@ def test_json_file_parser(load_config):
# get path of data files
filepath = os.path.join(files_dir, filepath)
# read file content
data_file = open(filepath)
data = json.load(data_file)
file_content_length += len(data)
data_file.close()
with open(filepath) as data_file:
data = json.load(data_file)
file_content_length += len(data)
assert len(json_file_parser(filepath=files_dir)) == file_content_length

View File

@@ -0,0 +1,69 @@
# standard imports
from urllib.parse import urlparse, parse_qs
# external imports
import pytest
import requests
from requests.exceptions import HTTPError
import requests_mock
# local imports
from cic_ussd.http.requests import (error_handler,
get_query_parameters,
get_request_endpoint,
get_request_method,
make_request)
from cic_ussd.error import UnsupportedMethodError
# test imports
@pytest.mark.parametrize('status_code, starts_with', [
(102, 'Informational errors'),
(303, 'Redirect Issues'),
(406, 'Client Error'),
(500, 'Server Error')
])
def test_error_handler(status_code, starts_with, mocker):
mock_result = mocker.patch('requests.Response')
mock_result.status_code = status_code
with pytest.raises(HTTPError) as error:
error_handler(mock_result)
assert str(error.value).startswith(starts_with)
def test_get_query_parameters(with_params_env):
assert get_query_parameters(with_params_env, 'phone') == with_params_env.get('REQUEST_URI')[8:]
parsed_url = urlparse(with_params_env.get('REQUEST_URI'))
params = parse_qs(parsed_url.query)
assert get_query_parameters(with_params_env) == params
def test_get_request_endpoint(with_params_env):
assert get_request_endpoint(with_params_env) == with_params_env.get('PATH_INFO')
def test_get_request_method(with_params_env):
assert get_request_method(with_params_env) == with_params_env.get('REQUEST_METHOD')
def test_make_request(mock_response, mock_url):
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('GET', mock_url, status_code=200, reason='OK', json=mock_response)
response = make_request(method='GET', url=mock_url)
assert response.json() == requests.get(mock_url).json()
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('POST', mock_url, status_code=201, reason='CREATED', json=mock_response)
response = make_request('POST', mock_url, {'test': 'data'})
assert response.content == requests.post(mock_url).content
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('PUT', mock_url, status_code=200, reason='OK')
response = make_request('PUT', mock_url, data={'test': 'data'})
assert response.content == requests.put(mock_url).content
with pytest.raises(UnsupportedMethodError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('DELETE', mock_url, status_code=200, reason='OK')
make_request('DELETE', mock_url)
assert str(error.value) == 'Unsupported method: DELETE'

View File

@@ -0,0 +1,18 @@
# standard imports
# external imports
import pytest
# local imports
from cic_ussd.http.responses import with_content_headers
# test imports
@pytest.mark.parametrize('headers, response, expected_result',[
([('Content-Type', 'text/plain')], 'some-text', (b'some-text', [('Content-Type', 'text/plain'), ('Content-Length', '9')])),
([('Content-Type', 'text/plain'), ('Content-Length', '0')], 'some-text', (b'some-text', [('Content-Type', 'text/plain'), ('Content-Length', '9')]))
])
def test_with_content_headers(headers, response, expected_result):
response_bytes, headers = with_content_headers(headers, response)
assert response_bytes, headers == expected_result

View File

@@ -0,0 +1,45 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_ussd.db.models.account import Account
from cic_ussd.http.routes import locked_accounts, pin_reset
# test imports
@pytest.mark.parametrize('method, expected_response, expected_message', [
('GET', '{"status": "LOCKED"}', '200 OK'),
('PUT', 'Pin reset successful.', '200 OK'),
])
def test_pin_reset(method, expected_response, expected_message, init_database, pin_blocked_account, uwsgi_env):
uwsgi_env['REQUEST_METHOD'] = method
response, message = pin_reset(uwsgi_env, pin_blocked_account.phone_number, init_database)
assert response == expected_response
assert message == expected_message
response, message = pin_reset(uwsgi_env, '070000000', init_database)
assert response == ''
assert message == '404 Not found'
def test_locked_accounts(init_database, locked_accounts_env, locked_accounts_traffic):
response, message = locked_accounts(locked_accounts_env, init_database)
assert message == '200 OK'
locked_account_addresses = json.loads(response)
assert len(locked_account_addresses) == 10
account_1 = init_database.query(Account).filter_by(blockchain_address=locked_account_addresses[2]).first()
account_2 = init_database.query(Account).filter_by(blockchain_address=locked_account_addresses[7]).first()
assert account_1.updated > account_2.updated
locked_accounts_env['PATH_INFO'] = '/accounts/locked/10'
response, message = locked_accounts(locked_accounts_env, init_database)
assert message == '200 OK'
locked_account_addresses = json.loads(response)
assert len(locked_account_addresses) == 10
locked_accounts_env['REQUEST_METHOD'] = 'POST'
response, message = locked_accounts(locked_accounts_env, init_database)
assert message == '405 Play by the rules'
assert response == ''

View File

@@ -1,12 +1,14 @@
# standard imports
import os
# third party imports
# external imports
import pytest
# local imports
from cic_ussd.files.local_files import create_local_file_data_stores
from cic_ussd.menu.ussd_menu import UssdMenu
# tests imports
from tests.helpers.tmp_files import create_tmp_file
@@ -49,4 +51,3 @@ def test_failed_create_ussd_menu():
assert str(error.value) == "Menu already exists!"
os.close(descriptor)
os.remove(tmp_file)

View File

@@ -0,0 +1,44 @@
# standard imports
import json
import os
# external imports
import requests_mock
from chainlib.hash import strip_0x
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.metadata.base import MetadataRequestsHandler
# external imports
def test_metadata_requests_handler(activated_account,
init_cache,
load_config,
person_metadata,
setup_metadata_request_handler,
setup_metadata_signer):
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
cic_type = ':cic.person'
metadata_client = MetadataRequestsHandler(cic_type, identifier)
assert metadata_client.cic_type == cic_type
assert metadata_client.engine == 'pgp'
assert metadata_client.identifier == identifier
assert metadata_client.metadata_pointer == generate_metadata_pointer(identifier, cic_type)
assert metadata_client.url == os.path.join(load_config.get('CIC_META_URL'), metadata_client.metadata_pointer)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('POST', metadata_client.url, status_code=200, reason='OK', json=person_metadata)
person_metadata['digest'] = os.urandom(20).hex()
request_mocker.register_uri('PUT', metadata_client.url, status_code=200, reason='OK', json=person_metadata)
result = metadata_client.create(person_metadata)
assert result.json() == person_metadata
assert result.status_code == 200
person_metadata.pop('digest')
request_mocker.register_uri('GET', metadata_client.url, status_code=200, reason='OK', json=person_metadata)
result = metadata_client.query()
assert result == person_metadata
cached_metadata = metadata_client.get_cached_metadata()
assert json.loads(cached_metadata) == person_metadata

View File

@@ -0,0 +1,22 @@
# standard imports
import os
# external imports
from chainlib.hash import strip_0x
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.metadata import CustomMetadata
# test imports
def test_custom_metadata(activated_account, load_config, setup_metadata_request_handler, setup_metadata_signer):
cic_type = ':cic.custom'
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
custom_metadata_client = CustomMetadata(identifier)
assert custom_metadata_client.cic_type == cic_type
assert custom_metadata_client.engine == 'pgp'
assert custom_metadata_client.identifier == identifier
assert custom_metadata_client.metadata_pointer == generate_metadata_pointer(identifier, cic_type)
assert custom_metadata_client.url == os.path.join(
load_config.get('CIC_META_URL'), custom_metadata_client.metadata_pointer)

View File

@@ -1,80 +0,0 @@
# standard imports
import json
# third-party imports
import pytest
import requests
import requests_mock
# local imports
from cic_ussd.error import UnsupportedMethodError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer, make_request
def test_make_request(define_metadata_pointer_url, mock_meta_get_response, mock_meta_post_response, person_metadata):
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=200,
reason='OK',
content=json.dumps(mock_meta_get_response).encode('utf-8')
)
response = make_request(method='GET', url=define_metadata_pointer_url)
assert response.content == requests.get(define_metadata_pointer_url).content
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=201,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
response = make_request(
method='POST',
url=define_metadata_pointer_url,
data=json.dumps(person_metadata).encode('utf-8'),
headers={
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
)
assert response.content == requests.post(define_metadata_pointer_url).content
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
response = make_request(
method='PUT',
url=define_metadata_pointer_url,
data=json.dumps(person_metadata).encode('utf-8'),
headers={
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
)
assert response.content == requests.put(define_metadata_pointer_url).content
with pytest.raises(UnsupportedMethodError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'DELETE',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
make_request(
method='DELETE',
url=define_metadata_pointer_url
)
assert str(error.value) == 'Unsupported method: DELETE'
def test_blockchain_address_to_metadata_pointer(create_activated_user):
blockchain_address = create_activated_user.blockchain_address
assert type(blockchain_address_to_metadata_pointer(blockchain_address)) == bytes

View File

@@ -0,0 +1,22 @@
# standard imports
import os
# external imports
from chainlib.hash import strip_0x
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.metadata import PersonMetadata
# test imports
def test_person_metadata(activated_account, load_config, setup_metadata_request_handler, setup_metadata_signer):
cic_type = ':cic.person'
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
person_metadata_client = PersonMetadata(identifier)
assert person_metadata_client.cic_type == cic_type
assert person_metadata_client.engine == 'pgp'
assert person_metadata_client.identifier == identifier
assert person_metadata_client.metadata_pointer == generate_metadata_pointer(identifier, cic_type)
assert person_metadata_client.url == os.path.join(
load_config.get('CIC_META_URL'), person_metadata_client.metadata_pointer)

View File

@@ -0,0 +1,23 @@
# standard imports
import os
# external imports
from chainlib.hash import strip_0x
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.metadata import PhonePointerMetadata
# test imports
def test_phone_pointer_metadata(activated_account, load_config, setup_metadata_request_handler, setup_metadata_signer):
cic_type = ':cic.phone'
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
phone_pointer_metadata = PhonePointerMetadata(identifier)
assert phone_pointer_metadata.cic_type == cic_type
assert phone_pointer_metadata.engine == 'pgp'
assert phone_pointer_metadata.identifier == identifier
assert phone_pointer_metadata.metadata_pointer == generate_metadata_pointer(identifier, cic_type)
assert phone_pointer_metadata.url == os.path.join(
load_config.get('CIC_META_URL'), phone_pointer_metadata.metadata_pointer)

View File

@@ -0,0 +1,22 @@
# standard imports
import os
# external imports
from chainlib.hash import strip_0x
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.metadata import PreferencesMetadata
# test imports
def test_preferences_metadata(activated_account, load_config, setup_metadata_request_handler, setup_metadata_signer):
cic_type = ':cic.preferences'
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
preferences_metadata_client = PreferencesMetadata(identifier)
assert preferences_metadata_client.cic_type == cic_type
assert preferences_metadata_client.engine == 'pgp'
assert preferences_metadata_client.identifier == identifier
assert preferences_metadata_client.metadata_pointer == generate_metadata_pointer(identifier, cic_type)
assert preferences_metadata_client.url == os.path.join(
load_config.get('CIC_META_URL'), preferences_metadata_client.metadata_pointer)

View File

@@ -9,26 +9,9 @@ from cic_ussd.metadata.signer import Signer
def test_client(load_config, setup_metadata_signer, person_metadata):
signer = Signer()
# get gpg used
digest = 'a4337bc45a8fc544c03f52dc550cd6e1e87021bc896588bd79e901e2'
person_metadata['digest'] = digest
gpg = signer.gpg
# check that key data was loaded
assert signer.key_data is not None
# check that correct operational key is returned
gpg.import_keys(key_data=signer.key_data)
gpg_keys = gpg.list_keys()
assert signer.get_operational_key() == gpg_keys[0]
# check that correct signature is returned
key_id = signer.get_operational_key().get('keyid')
signature = gpg.sign(message=digest, passphrase=load_config.get('KEYS_PASSPHRASE'), keyid=key_id)
assert str(signature) == signer.sign_digest(data=person_metadata)
# remove tmp gpg file
shutil.rmtree(Signer.gpg_path)

View File

@@ -1,123 +0,0 @@
# standard imports
import json
# third-party imports
import pytest
import requests_mock
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.redis import get_cached_data
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
PersonMetadata.base_url = load_config.get('CIC_META_URL')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
assert person_metadata_client.url == define_metadata_pointer_url
def test_create_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=201,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
person_metadata_client.create(data=person_metadata)
assert 'Get signed material response status: 201' in caplog.text
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=400,
reason='BAD REQUEST'
)
person_metadata_client.create(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_edit_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
person_metadata_client.edit(data=person_metadata)
assert 'Signed content submission status: 200' in caplog.text
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=400,
reason='BAD REQUEST'
)
person_metadata_client.edit(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_get_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
init_redis_cache,
load_config,
person_metadata,
setup_metadata_signer):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=200,
content=json.dumps(person_metadata).encode('utf-8'),
reason='OK'
)
person_metadata_client.query()
assert 'Get latest data status: 200' in caplog.text
key = generate_metadata_pointer(
identifier=identifier,
cic_type=':cic.person'
)
cached_user_metadata = get_cached_data(key=key)
assert cached_user_metadata
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=404,
reason='NOT FOUND'
)
person_metadata_client.query()
assert 'The data is not available and might need to be added.' in caplog.text
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'

View File

@@ -0,0 +1,183 @@
# standard imports
import json
# external imports
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.balance import get_cached_available_balance
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import (
get_cached_statement,
parse_statement_transactions,
statement_transaction_set
)
from cic_ussd.account.tokens import get_default_token_symbol
from cic_ussd.account.transaction import from_wei, to_wei
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import PersonMetadata
from cic_ussd.phone_number import Support
from cic_ussd.processor.menu import response
from cic_ussd.processor.util import parse_person_metadata
from cic_ussd.translation import translation_for
# test imports
def test_menu_processor(activated_account,
balances,
cache_balances,
cache_default_token_data,
cache_preferences,
cache_person_metadata,
cache_statement,
celery_session_worker,
generic_ussd_session,
init_database,
load_chain_spec,
load_support_phone,
load_ussd_menu,
mock_sync_balance_api_query,
mock_transaction_list_query,
valid_recipient):
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
available_balance = get_cached_available_balance(activated_account.blockchain_address)
token_symbol = get_default_token_symbol()
tax = ''
bonus = ''
display_key = 'ussd.kenya.account_balances'
ussd_menu = UssdMenu.find_by_name('account_balances')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
preferred_language,
available_balance=available_balance,
tax=tax,
bonus=bonus,
token_symbol=token_symbol)
cached_statement = get_cached_statement(activated_account.blockchain_address)
statement = json.loads(cached_statement)
statement_transactions = parse_statement_transactions(statement)
transaction_sets = [statement_transactions[tx:tx + 3] for tx in range(0, len(statement_transactions), 3)]
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
if transaction_sets:
first_transaction_set = statement_transaction_set(preferred_language, transaction_sets[0])
if len(transaction_sets) >= 2:
middle_transaction_set = statement_transaction_set(preferred_language, transaction_sets[1])
if len(transaction_sets) >= 3:
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
display_key = 'ussd.kenya.first_transaction_set'
ussd_menu = UssdMenu.find_by_name('first_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key, preferred_language, first_transaction_set=first_transaction_set)
display_key = 'ussd.kenya.middle_transaction_set'
ussd_menu = UssdMenu.find_by_name('middle_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key, preferred_language, middle_transaction_set=middle_transaction_set)
display_key = 'ussd.kenya.last_transaction_set'
ussd_menu = UssdMenu.find_by_name('last_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key, preferred_language, last_transaction_set=last_transaction_set)
display_key = 'ussd.kenya.display_user_metadata'
ussd_menu = UssdMenu.find_by_name('display_user_metadata')
name = ussd_menu.get('name')
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
person_metadata = PersonMetadata(identifier)
cached_person_metadata = person_metadata.get_cached_metadata()
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == parse_person_metadata(cached_person_metadata, display_key, preferred_language)
display_key = 'ussd.kenya.account_balances_pin_authorization'
ussd_menu = UssdMenu.find_by_name('account_balances_pin_authorization')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(f'{display_key}.first', preferred_language)
activated_account.failed_pin_attempts = 1
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
retry_pin_entry = translation_for('ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=2)
assert resp == translation_for(f'{display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry)
activated_account.failed_pin_attempts = 0
display_key = 'ussd.kenya.start'
ussd_menu = UssdMenu.find_by_name('start')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
preferred_language,
account_balance=available_balance,
account_token_name=token_symbol)
display_key = 'ussd.kenya.transaction_pin_authorization'
ussd_menu = UssdMenu.find_by_name('transaction_pin_authorization')
name = ussd_menu.get('name')
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '15'
}
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
user_input = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input))
tx_recipient_information = valid_recipient.standard_metadata_id()
tx_sender_information = activated_account.standard_metadata_id()
assert resp == translation_for(f'{display_key}.first',
preferred_language,
recipient_information=tx_recipient_information,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
sender_information=tx_sender_information)
display_key = 'ussd.kenya.exit_insufficient_balance'
ussd_menu = UssdMenu.find_by_name('exit_insufficient_balance')
name = ussd_menu.get('name')
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '85'
}
transaction_amount = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(transaction_amount))
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
preferred_language,
amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=available_balance)
display_key = 'ussd.kenya.exit_invalid_menu_option'
ussd_menu = UssdMenu.find_by_name('exit_invalid_menu_option')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key, preferred_language, support_phone=Support.phone_number)
display_key = 'ussd.kenya.exit_successful_transaction'
ussd_menu = UssdMenu.find_by_name('exit_successful_transaction')
name = ussd_menu.get('name')
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '15'
}
transaction_amount = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(transaction_amount))
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
preferred_language,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
sender_information=tx_sender_information)

View File

@@ -0,0 +1,121 @@
# standard imports
import json
import os
import time
# external imports
import i18n
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import PersonMetadata
from cic_ussd.processor.ussd import get_menu, handle_menu, handle_menu_operations
from cic_ussd.translation import translation_for
# test imports
from tests.helpers.accounts import phone_number
def test_handle_menu(activated_account,
init_cache,
init_database,
load_ussd_menu,
pending_account,
persisted_ussd_session,
pin_blocked_account,
preferences):
persisted_ussd_session.state = 'account_creation_prompt'
menu_resp = handle_menu(activated_account, init_database)
ussd_menu = UssdMenu.find_by_name('start')
assert menu_resp.get('name') == ussd_menu.get('name')
persisted_ussd_session.state = 'display_user_metadata'
menu_resp = handle_menu(activated_account, init_database)
ussd_menu = UssdMenu.find_by_name('display_user_metadata')
assert menu_resp.get('name') == ussd_menu.get('name')
menu_resp = handle_menu(pin_blocked_account, init_database)
ussd_menu = UssdMenu.find_by_name('exit_pin_blocked')
assert menu_resp.get('name') == ussd_menu.get('name')
menu_resp = handle_menu(pending_account, init_database)
ussd_menu = UssdMenu.find_by_name('initial_language_selection')
assert menu_resp.get('name') == ussd_menu.get('name')
identifier = bytes.fromhex(strip_0x(pending_account.blockchain_address))
key = cache_data_key(identifier, ':cic.preferences')
cache_data(key, json.dumps(preferences))
time.sleep(2)
menu_resp = handle_menu(pending_account, init_database)
ussd_menu = UssdMenu.find_by_name('initial_pin_entry')
assert menu_resp.get('name') == ussd_menu.get('name')
def test_get_menu(activated_account,
cache_preferences,
cache_person_metadata,
generic_ussd_session,
init_database,
init_state_machine,
persisted_ussd_session):
menu_resp = get_menu(activated_account, init_database, '', generic_ussd_session)
ussd_menu = UssdMenu.find_by_name(name='exit_invalid_input')
assert menu_resp.get('name') == ussd_menu.get('name')
menu_resp = get_menu(activated_account, init_database, '1111', None)
ussd_menu = UssdMenu.find_by_name('initial_language_selection')
assert menu_resp.get('name') == ussd_menu.get('name')
generic_ussd_session['state'] = 'start'
menu_resp = get_menu(activated_account, init_database, '1', generic_ussd_session)
ussd_menu = UssdMenu.find_by_name(name='enter_transaction_recipient')
assert menu_resp.get('name') == ussd_menu.get('name')
def test_handle_menu_operations(activated_account,
cache_preferences,
celery_session_worker,
generic_ussd_session,
init_database,
init_cache,
load_chain_spec,
load_config,
mock_account_creation_task_result,
persisted_ussd_session,
person_metadata,
set_locale_files,
setup_metadata_request_handler,
setup_metadata_signer,
task_uuid):
# sourcery skip: extract-duplicate-method
chain_str = Chain.spec.__str__()
phone = phone_number()
external_session_id = os.urandom(20).hex()
valid_service_codes = load_config.get('USSD_SERVICE_CODE').split(",")
preferred_language = i18n.config.get('fallback')
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '4444')
assert resp == translation_for('ussd.kenya.account_creation_prompt', preferred_language)
cached_ussd_session = get_cached_data(external_session_id)
ussd_session = json.loads(cached_ussd_session)
assert ussd_session['msisdn'] == phone
task_tracker = init_database.query(TaskTracker).filter_by(task_uuid=task_uuid).first()
assert task_tracker.task_uuid == task_uuid
cached_creation_task_uuid = get_cached_data(task_uuid)
creation_task_uuid_data = json.loads(cached_creation_task_uuid)
assert creation_task_uuid_data['status'] == 'PENDING'
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
person_metadata_client = PersonMetadata(identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri('GET', person_metadata_client.url, status_code=200, reason='OK',
json=person_metadata)
person_metadata_client.query()
external_session_id = os.urandom(20).hex()
phone = activated_account.phone_number
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
persisted_ussd_session.state = 'enter_transaction_recipient'
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '1')
assert resp == translation_for('ussd.kenya.enter_transaction_recipient', preferred_language)

View File

@@ -0,0 +1,62 @@
# standard imports
import datetime
import json
# external imports
import pytest
from chainlib.hash import strip_0x
from cic_types.models.person import get_contact_data_from_vcard
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.metadata import PersonMetadata
from cic_ussd.processor.util import latest_input, parse_person_metadata, resume_last_ussd_session
from cic_ussd.translation import translation_for
# test imports
@pytest.mark.parametrize('user_input, expected_value', [
('1*9*6*7', '7'),
('1', '1'),
('', '')
])
def test_latest_input(user_input, expected_value):
assert latest_input(user_input) == expected_value
def test_parse_person_metadata(activated_account, cache_person_metadata, cache_preferences):
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
person_metadata = PersonMetadata(identifier)
cached_person_metadata = person_metadata.get_cached_metadata()
person_metadata = json.loads(cached_person_metadata)
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
display_key = 'ussd.kenya.display_person_metadata'
parsed_person_metadata = parse_person_metadata(cached_person_metadata,
display_key,
preferred_language)
contact_data = get_contact_data_from_vcard(person_metadata.get('vcard'))
full_name = f'{contact_data.get("given")} {contact_data.get("family")}'
date_of_birth = person_metadata.get('date_of_birth')
year_of_birth = date_of_birth.get('year')
present_year = datetime.datetime.now().year
age = present_year - year_of_birth
gender = person_metadata.get('gender')
products = ', '.join(person_metadata.get('products'))
location = person_metadata.get('location').get('area_name')
assert parsed_person_metadata == translation_for(key=display_key,
preferred_language=preferred_language,
full_name=full_name,
age=age,
gender=gender,
location=location,
products=products)
@pytest.mark.parametrize('last_state, expected_menu_name', [
('account_creation_prompt', 'start'),
('enter_transaction_recipient', 'enter_transaction_recipient')
])
def test_resume_last_ussd_session(expected_menu_name, last_state, load_ussd_menu):
assert resume_last_ussd_session(last_state).get('name') == expected_menu_name

View File

@@ -0,0 +1,72 @@
# standard imports
import json
# external imports
# local imports
from cic_ussd.cache import get_cached_data
from cic_ussd.db.models.ussd_session import UssdSession as PersistedUssdSession
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.session.ussd_session import (create_or_update_session,
create_ussd_session,
persist_ussd_session,
save_session_data,
update_ussd_session,
UssdSession)
# test imports
def test_ussd_session(cached_ussd_session, load_ussd_menu):
assert UssdMenu.find_by_name(name='initial_language_selection').get('name') == cached_ussd_session.state
cached_ussd_session.set_data('some_key', 'some_value')
assert cached_ussd_session.get_data('some_key') == 'some_value'
assert isinstance(cached_ussd_session, UssdSession)
assert isinstance(cached_ussd_session.to_json(), dict)
def test_create_or_update_session(activated_account_ussd_session, cached_ussd_session, init_cache, init_database):
external_session_id = activated_account_ussd_session.get('external_session_id')
ussd_session = create_or_update_session(external_session_id=external_session_id,
service_code=activated_account_ussd_session.get('service_code'),
msisdn=activated_account_ussd_session.get('msisdn'),
user_input=activated_account_ussd_session.get('user_input'),
state=activated_account_ussd_session.get('state'),
session=init_database)
cached_ussd_session = get_cached_data(external_session_id)
assert json.loads(cached_ussd_session).get('external_session_id') == ussd_session.external_session_id
def test_update_ussd_session(activated_account_ussd_session, cached_ussd_session, init_cache, load_ussd_menu):
ussd_session = create_ussd_session(external_session_id=activated_account_ussd_session.get('external_session_id'),
service_code=activated_account_ussd_session.get('service_code'),
msisdn=activated_account_ussd_session.get('msisdn'),
user_input=activated_account_ussd_session.get('user_input'),
state=activated_account_ussd_session.get('state'))
assert ussd_session.user_input == activated_account_ussd_session.get('user_input')
assert ussd_session.state == activated_account_ussd_session.get('state')
ussd_session = update_ussd_session(ussd_session=ussd_session, user_input='1*2', state='initial_pin_entry')
assert ussd_session.user_input == '1*2'
assert ussd_session.state == 'initial_pin_entry'
def test_save_session_data(activated_account_ussd_session,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
ussd_session_data):
external_session_id = activated_account_ussd_session.get('external_session_id')
ussd_session = get_cached_data(external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data') == {}
save_session_data(
queue='cic-ussd',
data=ussd_session_data,
ussd_session=cached_ussd_session.to_json(),
session=init_database
)
ussd_session = get_cached_data(external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data') == ussd_session_data

View File

@@ -1,11 +0,0 @@
# standard imports
import json
# local imports
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.session.ussd_session import UssdSession
def test_ussd_session(load_ussd_menu, get_in_redis_ussd_session):
ussd_session = get_in_redis_ussd_session
assert UssdMenu.find_by_name(name='initial_language_selection').get('name') == ussd_session.state

View File

@@ -0,0 +1,154 @@
# standard imports
import json
# external imports
import pytest
import requests_mock
from chainlib.hash import strip_0x
from cic_types.models.person import Person, get_contact_data_from_vcard
# local imports
from cic_ussd.cache import get_cached_data
from cic_ussd.account.maps import gender
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.db.enum import AccountStatus
from cic_ussd.metadata import PreferencesMetadata
from cic_ussd.state_machine.logic.account import (change_preferred_language,
edit_user_metadata_attribute,
parse_gender,
parse_person_metadata,
save_complete_person_metadata,
save_metadata_attribute_to_session_data,
update_account_status_to_active)
from cic_ussd.translation import translation_for
# test imports
@pytest.mark.parametrize('user_input, expected_preferred_language', [
('1', 'en'),
('2', 'sw')
])
def test_change_preferred_language(activated_account,
celery_session_worker,
expected_preferred_language,
init_database,
generic_ussd_session,
mock_response,
preferences,
setup_metadata_request_handler,
user_input):
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
preferences_metadata_client = PreferencesMetadata(identifier)
with requests_mock.Mocker(real_http=False) as requests_mocker:
requests_mocker.register_uri(
'POST', preferences_metadata_client.url, status_code=200, reason='OK', json=mock_response
)
state_machine_data = (user_input, generic_ussd_session, activated_account, init_database)
res = change_preferred_language(state_machine_data)
init_database.commit()
assert res.id is not None
assert activated_account.preferred_language == expected_preferred_language
@pytest.mark.parametrize('user_input', [
'1',
'2',
'3'
])
def test_parse_gender(activated_account, cache_preferences, user_input):
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
parsed_gender = parse_gender(activated_account, user_input)
r_user_input = gender().get(user_input)
assert parsed_gender == translation_for(f'helpers.{r_user_input}', preferred_language)
def test_parse_person_metadata(activated_account, load_chain_spec, raw_person_metadata):
parsed_user_metadata = parse_person_metadata(activated_account, raw_person_metadata)
person = Person()
user_metadata = person.deserialize(parsed_user_metadata)
assert parsed_user_metadata == user_metadata.serialize()
@pytest.mark.parametrize("current_state, expected_key, expected_result, user_input", [
("enter_given_name", "given_name", "John", "John"),
("enter_family_name", "family_name", "Doe", "Doe"),
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_metadata_attribute_to_session_data(activated_account,
cached_ussd_session,
celery_session_worker,
current_state,
expected_key,
expected_result,
init_cache,
init_database,
load_chain_spec,
set_locale_files,
persisted_ussd_session,
user_input):
persisted_ussd_session.state = current_state
ussd_session = persisted_ussd_session.to_json()
state_machine_data = (user_input, ussd_session, activated_account, init_database)
ussd_session_in_cache = get_cached_data(cached_ussd_session.external_session_id)
ussd_session_in_cache = json.loads(ussd_session_in_cache)
assert ussd_session_in_cache.get('data') == {}
ussd_session['state'] = current_state
save_metadata_attribute_to_session_data(state_machine_data)
cached_ussd_session = get_cached_data(cached_ussd_session.external_session_id)
cached_ussd_session = json.loads(cached_ussd_session)
assert cached_ussd_session.get('data')[expected_key] == expected_result
def test_update_account_status_to_active(generic_ussd_session, init_database, pending_account):
state_machine_data = ('', generic_ussd_session, pending_account, init_database)
assert pending_account.get_status(init_database) == AccountStatus.PENDING.name
update_account_status_to_active(state_machine_data)
assert pending_account.get_status(init_database) == AccountStatus.ACTIVE.name
def test_save_complete_person_metadata(activated_account,
cached_ussd_session,
celery_session_worker,
init_database,
load_chain_spec,
mocker,
person_metadata,
raw_person_metadata):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['data'] = raw_person_metadata
metadata = parse_person_metadata(activated_account, raw_person_metadata)
state_machine_data = ('', ussd_session, activated_account, init_database)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
save_complete_person_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(activated_account.blockchain_address, metadata), {}, queue='cic-ussd')
def test_edit_user_metadata_attribute(activated_account,
cache_person_metadata,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
load_chain_spec,
mocker,
person_metadata):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert person_metadata['location']['area_name'] == 'kayaba'
ussd_session['data'] = {'location': 'nairobi'}
contact_data = get_contact_data_from_vcard(person_metadata.get('vcard'))
phone_number = contact_data.get('tel')
activated_account.phone_number = phone_number
state_machine_data = ('', ussd_session, activated_account, init_database)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data)
person_metadata['date_registered'] = int(activated_account.created.replace().timestamp())
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
(activated_account.blockchain_address, person_metadata), {}, queue='cic-ussd')

View File

@@ -1,21 +0,0 @@
# local imports
from cic_ussd.state_machine.logic.menu import (menu_one_selected,
menu_two_selected,
menu_three_selected,
menu_four_selected)
def test_menu_selection(create_pending_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
assert menu_one_selected(('1', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_one_selected(('x', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_two_selected(('2', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_two_selected(('1', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_three_selected(('3', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_three_selected(('4', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_four_selected(('4', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_four_selected(('d', serialized_in_db_ussd_session, create_pending_user)) is False

View File

@@ -0,0 +1,37 @@
# standard imports
# external imports
# local imports
from cic_ussd.state_machine.logic.menu import (menu_one_selected,
menu_two_selected,
menu_three_selected,
menu_four_selected,
menu_five_selected,
menu_six_selected,
menu_zero_zero_selected,
menu_ninety_nine_selected)
# test imports
def test_menu_selection(init_database, pending_account, persisted_ussd_session):
ussd_session = persisted_ussd_session.to_json()
assert menu_one_selected(('1', ussd_session, pending_account, init_database)) is True
assert menu_one_selected(('x', ussd_session, pending_account, init_database)) is False
assert menu_two_selected(('2', ussd_session, pending_account, init_database)) is True
assert menu_two_selected(('1', ussd_session, pending_account, init_database)) is False
assert menu_three_selected(('3', ussd_session, pending_account, init_database)) is True
assert menu_three_selected(('4', ussd_session, pending_account, init_database)) is False
assert menu_four_selected(('4', ussd_session, pending_account, init_database)) is True
assert menu_four_selected(('d', ussd_session, pending_account, init_database)) is False
assert menu_five_selected(('5', ussd_session, pending_account, init_database)) is True
assert menu_five_selected(('e', ussd_session, pending_account, init_database)) is False
assert menu_six_selected(('6', ussd_session, pending_account, init_database)) is True
assert menu_six_selected(('8', ussd_session, pending_account, init_database)) is False
assert menu_zero_zero_selected(('00', ussd_session, pending_account, init_database)) is True
assert menu_zero_zero_selected(('/', ussd_session, pending_account, init_database)) is False
assert menu_ninety_nine_selected(('99', ussd_session, pending_account, init_database)) is True
assert menu_ninety_nine_selected(('d', ussd_session, pending_account, init_database)) is False

View File

@@ -1,94 +0,0 @@
# standards imports
import json
# third party imports
import pytest
# local imports
from cic_ussd.encoder import check_password_hash, create_password_hash
from cic_ussd.state_machine.logic.pin import (complete_pin_change,
is_valid_pin,
is_valid_new_pin,
is_authorized_pin,
is_blocked_pin,
pins_match,
save_initial_pin_to_session_data)
def test_complete_pin_change(init_database, create_pending_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('1212', serialized_in_db_ussd_session, create_pending_user)
assert create_pending_user.password_hash is None
create_in_db_ussd_session.set_data(key='initial_pin', session=init_database, value=create_password_hash('1212'))
complete_pin_change(state_machine_data)
assert create_pending_user.password_hash is not None
assert create_pending_user.verify_password(password='1212') is True
@pytest.mark.parametrize('user_input, expected', [
('4562', True),
('jksu', False),
('ij45', False),
])
def test_is_valid_pin(create_pending_user, create_in_db_ussd_session, user_input, expected):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
assert is_valid_pin(state_machine_data) is expected
@pytest.mark.parametrize('user_input, expected', [
('1212', True),
('0000', False)
])
def test_pins_match(init_database, create_pending_user, create_in_db_ussd_session, user_input, expected):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
create_in_db_ussd_session.set_data(key='initial_pin', session=init_database, value=create_password_hash(user_input))
assert pins_match(state_machine_data) is True
def test_save_initial_pin_to_session_data(create_pending_user,
create_in_redis_ussd_session,
create_in_db_ussd_session,
celery_session_worker):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('1212', serialized_in_db_ussd_session, create_pending_user)
save_initial_pin_to_session_data(state_machine_data)
external_session_id = create_in_db_ussd_session.external_session_id
in_memory_ussd_session = create_in_redis_ussd_session.get(external_session_id)
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert check_password_hash(
password='1212', hashed_password=in_memory_ussd_session.get('session_data')['initial_pin'])
@pytest.mark.parametrize('user_input, expected_result', [
('1212', False),
('0000', True)
])
def test_is_authorized_pin(create_activated_user, create_in_db_ussd_session, expected_result, user_input):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
assert is_authorized_pin(state_machine_data=state_machine_data) is expected_result
def test_is_not_blocked_pin(create_activated_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
assert is_blocked_pin(state_machine_data=state_machine_data) is False
def test_is_blocked_pin(create_pin_blocked_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
alt_state_machine_data = ('user_input', serialized_in_db_ussd_session, create_pin_blocked_user)
assert is_blocked_pin(state_machine_data=alt_state_machine_data) is True
@pytest.mark.parametrize('user_input, expected_result', [
('1212', True),
('0000', False)
])
def test_is_valid_new_pin(create_activated_user, create_in_db_ussd_session, expected_result, user_input):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
assert is_valid_new_pin(state_machine_data=state_machine_data) is expected_result

View File

@@ -0,0 +1,101 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_ussd.cache import get_cached_data
from cic_ussd.encoder import check_password_hash, create_password_hash
from cic_ussd.state_machine.logic.pin import (complete_pin_change,
is_valid_pin,
is_valid_new_pin,
is_authorized_pin,
is_blocked_pin,
is_locked_account,
pins_match,
save_initial_pin_to_session_data)
def test_complete_pin_change(activated_account, cached_ussd_session, init_database):
state_machine_data = ('1212', cached_ussd_session.to_json(), activated_account, init_database)
assert activated_account.password_hash is not None
cached_ussd_session.set_data('initial_pin', create_password_hash('1212'))
complete_pin_change(state_machine_data)
assert activated_account.verify_password('1212') is True
@pytest.mark.parametrize('user_input, expected', [
('4562', True),
('jksu', False),
('ij45', False),
])
def test_is_valid_pin(activated_account, expected, generic_ussd_session, init_database, user_input):
state_machine_data = (user_input, generic_ussd_session, activated_account, init_database)
assert is_valid_pin(state_machine_data) is expected
@pytest.mark.parametrize('user_input', [
'1212',
'0000'
])
def test_pins_match(activated_account, cached_ussd_session, init_cache, init_database, user_input):
state_machine_data = (user_input, cached_ussd_session.to_json(), activated_account, init_database)
cached_ussd_session.set_data('initial_pin', create_password_hash(user_input))
assert pins_match(state_machine_data) is True
def test_save_initial_pin_to_session_data(activated_account,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
persisted_ussd_session,
set_fernet_key):
state_machine_data = ('1212', cached_ussd_session.to_json(), activated_account, init_database)
save_initial_pin_to_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert check_password_hash('1212', ussd_session.get('data')['initial_pin'])
cached_ussd_session.set_data('some_key', 'some_value')
state_machine_data = ('1212', cached_ussd_session.to_json(), activated_account, init_database)
save_initial_pin_to_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data')['some_key'] == 'some_value'
@pytest.mark.parametrize('user_input, expected_result', [
('1212', False),
('0000', True)
])
def test_is_authorized_pin(activated_account, cached_ussd_session, expected_result, init_database, user_input):
state_machine_data = (user_input, cached_ussd_session.to_json(), activated_account, init_database)
assert is_authorized_pin(state_machine_data) is expected_result
def test_is_not_blocked_pin(activated_account, cached_ussd_session, init_database):
state_machine_data = ('', cached_ussd_session.to_json(), activated_account, init_database)
assert is_blocked_pin(state_machine_data) is False
def test_is_blocked_pin(cached_ussd_session, init_database, pin_blocked_account):
state_machine_data = ('user_input', cached_ussd_session, pin_blocked_account, init_database)
assert is_blocked_pin(state_machine_data) is True
def test_is_locked_account(activated_account, generic_ussd_session, init_database, pin_blocked_account):
state_machine_data = ('', generic_ussd_session, activated_account, init_database)
assert is_locked_account(state_machine_data) is False
state_machine_data = ('', generic_ussd_session, pin_blocked_account, init_database)
assert is_locked_account(state_machine_data) is True
@pytest.mark.parametrize('user_input, expected_result', [
('1212', True),
('0000', False)
])
def test_is_valid_new_pin(activated_account, cached_ussd_session, expected_result, init_database, user_input):
state_machine_data = (user_input, cached_ussd_session.to_json(), activated_account, init_database)
assert is_valid_new_pin(state_machine_data) is expected_result

View File

@@ -1,36 +0,0 @@
# standard imports
# third-party imports
import pytest
# local imports
from cic_ussd.state_machine.logic.sms import (send_terms_to_user_if_required,
process_mini_statement_request,
upsell_unregistered_recipient)
def test_send_terms_to_user_if_required(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
send_terms_to_user_if_required(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text
def test_process_mini_statement_request(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
process_mini_statement_request(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text
def test_upsell_unregistered_recipient(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
upsell_unregistered_recipient(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text

View File

@@ -0,0 +1,42 @@
# standard imports
import json
# external imports
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.tokens import get_default_token_symbol
from cic_ussd.cache import get_cached_data
from cic_ussd.phone_number import Support
from cic_ussd.state_machine.logic.sms import upsell_unregistered_recipient
from cic_ussd.translation import translation_for
# tests imports
def test_upsell_unregistered_recipient(activated_account,
cache_default_token_data,
cache_preferences,
cached_ussd_session,
init_database,
load_support_phone,
mock_notifier_api,
set_locale_files,
valid_recipient):
cached_ussd_session.set_data('recipient_phone_number', valid_recipient.phone_number)
state_machine_data = ('', cached_ussd_session.to_json(), activated_account, init_database)
upsell_unregistered_recipient(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
phone_number = ussd_session.get('data')['recipient_phone_number']
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
token_symbol = get_default_token_symbol()
tx_sender_information = activated_account.standard_metadata_id()
message = translation_for('sms.upsell_unregistered_recipient',
preferred_language,
tx_sender_information=tx_sender_information,
token_symbol=token_symbol,
support_phone=Support.phone_number)
assert mock_notifier_api.get('message') == message
assert mock_notifier_api.get('recipient') == phone_number

View File

@@ -1,111 +0,0 @@
# standard imports
import json
# third-party imports
import pytest
# local imports
from cic_ussd.state_machine.logic.transaction import (has_sufficient_balance,
is_valid_recipient,
is_valid_transaction_amount,
process_transaction_request,
save_recipient_phone_to_session_data,
save_transaction_amount_to_session_data)
from cic_ussd.redis import InMemoryStore
@pytest.mark.parametrize("amount, expected_result", [
('50', True),
('', False)
])
def test_is_valid_transaction_amount(create_activated_user, create_in_db_ussd_session, amount, expected_result):
state_machine_data = (amount, create_in_db_ussd_session, create_activated_user)
validity = is_valid_transaction_amount(state_machine_data=state_machine_data)
assert validity == expected_result
def test_save_recipient_phone_to_session_data(create_activated_user,
create_in_db_ussd_session,
celery_session_worker,
create_in_redis_ussd_session,
init_database):
phone_number = '+254712345678'
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (phone_number, serialized_in_db_ussd_session, create_activated_user)
save_recipient_phone_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')['recipient_phone_number'] == phone_number
def test_save_transaction_amount_to_session_data(create_activated_user,
create_in_db_ussd_session,
celery_session_worker,
create_in_redis_ussd_session,
init_database):
transaction_amount = '100'
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (transaction_amount, serialized_in_db_ussd_session, create_activated_user)
save_transaction_amount_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')['transaction_amount'] == transaction_amount
@pytest.mark.parametrize("test_value, expected_result", [
('45', True),
('75', False)
])
def test_has_sufficient_balance(mock_balance,
create_in_db_ussd_session,
create_valid_tx_sender,
expected_result,
test_value):
mock_balance(60)
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (test_value, serialized_in_db_ussd_session, create_valid_tx_sender)
result = has_sufficient_balance(state_machine_data=state_machine_data)
assert result == expected_result
@pytest.mark.parametrize("test_value, expected_result", [
('+25498765432', True),
('+25498765433', False)
])
def test_is_valid_recipient(create_in_db_ussd_session,
create_valid_tx_recipient,
create_valid_tx_sender,
expected_result,
test_value):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (test_value, serialized_in_db_ussd_session, create_valid_tx_sender)
result = is_valid_recipient(state_machine_data=state_machine_data)
assert result == expected_result
def test_process_transaction_request(create_valid_tx_recipient,
create_valid_tx_sender,
load_config,
mock_outgoing_transactions,
setup_chain_spec,
ussd_session_data):
ussd_session_data['session_data'] = {
'recipient_phone_number': create_valid_tx_recipient.phone_number,
'transaction_amount': '50'
}
state_machine_data = ('', ussd_session_data, create_valid_tx_sender)
process_transaction_request(state_machine_data=state_machine_data)
assert mock_outgoing_transactions[0].get('amount') == 50.0
assert mock_outgoing_transactions[0].get('token_symbol') == 'SRF'

View File

@@ -0,0 +1,112 @@
# standard imports
import json
# external imports
import pytest
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.transaction import to_wei
from cic_ussd.cache import get_cached_data
from cic_ussd.metadata import PersonMetadata
from cic_ussd.state_machine.logic.transaction import (is_valid_recipient,
is_valid_transaction_amount,
has_sufficient_balance,
process_transaction_request,
retrieve_recipient_metadata,
save_recipient_phone_to_session_data,
save_transaction_amount_to_session_data)
# test imports
def test_is_valid_recipient(activated_account,
generic_ussd_session,
init_database,
load_e164_region,
pending_account,
valid_recipient):
state_machine = ('0112365478', generic_ussd_session, valid_recipient, init_database)
assert is_valid_recipient(state_machine) is False
state_machine = (pending_account.phone_number, generic_ussd_session, valid_recipient, init_database)
assert is_valid_recipient(state_machine) is False
state_machine = (valid_recipient.phone_number, generic_ussd_session, activated_account, init_database)
assert is_valid_recipient(state_machine) is True
@pytest.mark.parametrize("amount, expected_result", [
('50', True),
('', False)
])
def test_is_valid_transaction_amount(activated_account, amount, expected_result, generic_ussd_session, init_database):
state_machine_data = (amount, generic_ussd_session, activated_account, init_database)
assert is_valid_transaction_amount(state_machine_data) is expected_result
@pytest.mark.parametrize("value, expected_result", [
('45', True),
('75', False)
])
def test_has_sufficient_balance(activated_account,
cache_balances,
expected_result,
generic_ussd_session,
init_database,
value):
state_machine_data = (value, generic_ussd_session, activated_account, init_database)
assert has_sufficient_balance(state_machine_data=state_machine_data) == expected_result
def test_process_transaction_request(activated_account,
cache_default_token_data,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
load_chain_spec,
load_config,
mock_transfer_api,
valid_recipient):
cached_ussd_session.set_data('recipient_phone_number', valid_recipient.phone_number)
cached_ussd_session.set_data('transaction_amount', '50')
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, activated_account, init_database)
process_transaction_request(state_machine_data)
assert mock_transfer_api['from_address'] == activated_account.blockchain_address
assert mock_transfer_api['to_address'] == valid_recipient.blockchain_address
assert mock_transfer_api['value'] == to_wei(50)
assert mock_transfer_api['token_symbol'] == load_config.get('TEST_TOKEN_SYMBOL')
def test_retrieve_recipient_metadata(activated_account,
generic_ussd_session,
init_database,
load_chain_spec,
mocker,
valid_recipient):
state_machine_data = (valid_recipient.phone_number, generic_ussd_session, activated_account, init_database)
mocked_query_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
retrieve_recipient_metadata(state_machine_data)
mocked_query_metadata.assert_called_with((valid_recipient.blockchain_address, ), {}, queue='cic-ussd')
def test_transaction_information_to_session_data(activated_account,
cached_ussd_session,
init_cache,
init_database,
load_e164_region,
valid_recipient):
assert cached_ussd_session.to_json()['data'] == {}
state_machine_data = (valid_recipient.phone_number, cached_ussd_session.to_json(), activated_account, init_database)
save_recipient_phone_to_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data')['recipient_phone_number'] == valid_recipient.phone_number
state_machine_data = ('25', cached_ussd_session.to_json(), activated_account, init_database)
save_transaction_amount_to_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data')['transaction_amount'] == '25'

View File

@@ -1,155 +0,0 @@
# standard imports
import json
# third-party-imports
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.redis import InMemoryStore
from cic_ussd.state_machine.logic.user import (
change_preferred_language_to_en,
change_preferred_language_to_sw,
edit_user_metadata_attribute,
format_user_metadata,
get_user_metadata,
save_complete_user_metadata,
process_gender_user_input,
save_metadata_attribute_to_session_data,
update_account_status_to_active)
def test_change_preferred_language(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
assert create_pending_user.preferred_language is None
change_preferred_language_to_en(state_machine_data)
assert create_pending_user.preferred_language == 'en'
change_preferred_language_to_sw(state_machine_data)
assert create_pending_user.preferred_language == 'sw'
def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
update_account_status_to_active(state_machine_data)
assert create_pending_user.get_account_status() == 'ACTIVE'
@pytest.mark.parametrize("current_state, expected_key, expected_result, user_input", [
("enter_given_name", "given_name", "John", "John"),
("enter_family_name", "family_name", "Doe", "Doe"),
("enter_gender", "gender", "Male", "1"),
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_metadata_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_metadata_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')[expected_key] == expected_result
@pytest.mark.parametrize("preferred_language, user_input, expected_gender_value", [
("en", "1", "Male"),
("en", "2", "Female"),
("sw", "1", "Mwanaume"),
("sw", "2", "Mwanamke"),
])
def test_process_gender_user_input(create_activated_user, expected_gender_value, preferred_language, user_input):
create_activated_user.preferred_language = preferred_language
gender = process_gender_user_input(user=create_activated_user, user_input=user_input)
assert gender == expected_gender_value
def test_format_user_metadata(create_activated_user,
complete_user_metadata,
setup_chain_spec):
from cic_types.models.person import Person
formatted_user_metadata = format_user_metadata(metadata=complete_user_metadata, user=create_activated_user)
person = Person()
user_metadata = person.deserialize(person_data=formatted_user_metadata)
assert formatted_user_metadata == user_metadata.serialize()
def test_save_complete_user_metadata(celery_session_worker,
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
ussd_session['session_data'] = complete_user_metadata
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
save_complete_user_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(user_metadata, create_activated_user.blockchain_address),
{},
queue='cic-ussd'
)
def test_edit_user_metadata_attribute(celery_session_worker,
cached_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
init_redis_cache,
mocker,
person_metadata,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
assert person_metadata['location']['area_name'] == 'kayaba'
# appropriately format session
ussd_session['session_data'] = {
'location': 'nairobi'
}
state_machine_data = ('', ussd_session, create_activated_user)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_person_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data=state_machine_data)
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
(create_activated_user.blockchain_address, person_metadata, Chain.spec.engine()),
{},
queue='cic-ussd'
)
def test_get_user_metadata_attribute(celery_session_worker,
create_activated_user,
create_in_redis_ussd_session,
mocker,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
get_user_metadata(state_machine_data=state_machine_data)
mocked_get_metadata.assert_called_with(
(create_activated_user.blockchain_address,),
{},
queue='cic-ussd'
)

View File

@@ -1,55 +1,54 @@
# standard imports
import json
# third-party imports
# external imports
import pytest
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import cache_data
from cic_ussd.state_machine.logic.validator import (is_valid_name,
from cic_ussd.state_machine.logic.validator import (has_cached_person_metadata,
is_valid_name,
is_valid_gender_selection,
has_cached_user_metadata)
is_valid_date)
# test imports
def test_has_cached_person_metadata(activated_account,
cache_person_metadata,
generic_ussd_session,
init_database,
pending_account):
state_machine_data = ('', generic_ussd_session, activated_account, init_database)
assert has_cached_person_metadata(state_machine_data) is True
state_machine_data = ('', generic_ussd_session, pending_account, init_database)
assert has_cached_person_metadata(state_machine_data) is False
@pytest.mark.parametrize("user_input, expected_result", [
("Arya", True),
("1234", False)
])
def test_is_valid_name(create_in_db_ussd_session, create_pending_user, user_input, expected_result):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
result = is_valid_name(state_machine_data=state_machine_data)
assert result is expected_result
def test_is_valid_name(expected_result, generic_ussd_session, init_database, pending_account, user_input):
def test_has_cached_user_metadata(create_in_db_ussd_session,
create_activated_user,
init_redis_cache,
person_metadata):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
result = has_cached_user_metadata(state_machine_data=state_machine_data)
assert result is False
# cache metadata
user = create_activated_user
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type=':cic.person'
)
cache_data(key=key, data=json.dumps(person_metadata))
result = has_cached_user_metadata(state_machine_data=state_machine_data)
assert result
state_machine_data = (user_input, generic_ussd_session, pending_account, init_database)
assert is_valid_name(state_machine_data) is expected_result
@pytest.mark.parametrize("user_input, expected_result", [
("1", True),
("2", True),
("3", True),
("4", False)
])
def test_is_valid_gender_selection(expected_result, generic_ussd_session, init_database, pending_account, user_input):
state_machine_data = (user_input, generic_ussd_session, pending_account, init_database)
assert is_valid_gender_selection(state_machine_data) is expected_result
@pytest.mark.parametrize("user_input, expected_result", [
("1935", True),
("1825", False),
("3", False)
])
def test_is_valid_gender_selection(create_in_db_ussd_session, create_pending_user, user_input, expected_result):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
result = is_valid_gender_selection(state_machine_data=state_machine_data)
assert result is expected_result
def test_is_valid_date(expected_result, generic_ussd_session, init_database, pending_account, user_input):
state_machine_data = (user_input, generic_ussd_session, pending_account, init_database)
assert is_valid_date(state_machine_data) is expected_result

View File

@@ -2,11 +2,12 @@
from cic_ussd.state_machine import UssdStateMachine
def test_state_machine(create_in_db_ussd_session,
get_in_redis_ussd_session,
load_data_into_state_machine,
create_pending_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine = UssdStateMachine(ussd_session=get_in_redis_ussd_session.to_json())
state_machine.scan_data(('1', serialized_in_db_ussd_session, create_pending_user))
def test_state_machine(activated_account_ussd_session,
celery_session_worker,
init_database,
init_state_machine,
pending_account):
state_machine = UssdStateMachine(activated_account_ussd_session)
state_machine.scan_data(('1', activated_account_ussd_session, pending_account, init_database))
assert state_machine.__repr__() == f'<KenyaUssdStateMachine: {state_machine.state}>'
assert state_machine.state == 'initial_pin_entry'

View File

@@ -0,0 +1,178 @@
# standard imports
import json
from decimal import Decimal
# external imports
import celery
import pytest
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.statement import generate, filter_statement_transactions
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.error import AccountCreationDataNotFound
from cic_ussd.metadata import PreferencesMetadata
# test imports
from tests.helpers.accounts import blockchain_address
def test_account_creation_callback(account_creation_data,
cache_account_creation_data,
celery_session_worker,
custom_metadata,
init_cache,
init_database,
load_chain_spec,
mocker,
setup_metadata_request_handler,
setup_metadata_signer):
phone_number = account_creation_data.get('phone_number')
result = blockchain_address()
task_uuid = account_creation_data.get('task_uuid')
mock_task = mocker.patch('celery.app.task.Task.request')
mock_task.root_id = task_uuid
mock_task.delivery_info = {'routing_key': 'cic-ussd'}
status_code = 1
with pytest.raises(ValueError) as error:
s_account_creation_callback = celery.signature(
'cic_ussd.tasks.callback_handler.account_creation_callback', [task_uuid, '', status_code]
)
s_account_creation_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}'
cached_account_creation_data = get_cached_data(task_uuid)
cached_account_creation_data = json.loads(cached_account_creation_data)
assert cached_account_creation_data.get('status') == account_creation_data.get('status')
mock_add_phone_pointer = mocker.patch('cic_ussd.tasks.metadata.add_phone_pointer.apply_async')
mock_add_custom_metadata = mocker.patch('cic_ussd.tasks.metadata.add_custom_metadata.apply_async')
s_account_creation_callback = celery.signature(
'cic_ussd.tasks.callback_handler.account_creation_callback', [result, '', 0]
)
s_account_creation_callback.apply_async().get()
account = init_database.query(Account).filter_by(phone_number=phone_number).first()
assert account.blockchain_address == result
cached_account_creation_data = get_cached_data(task_uuid)
cached_account_creation_data = json.loads(cached_account_creation_data)
assert cached_account_creation_data.get('status') == 'CREATED'
mock_add_phone_pointer.assert_called_with((result, phone_number), {}, queue='cic-ussd')
mock_add_custom_metadata.assert_called_with((result, custom_metadata), {}, queue='cic-ussd')
task_uuid = celery.uuid()
mock_task.root_id = task_uuid
with pytest.raises(AccountCreationDataNotFound) as error:
s_account_creation_callback = celery.signature(
'cic_ussd.tasks.callback_handler.account_creation_callback', [task_uuid, '', 0]
)
s_account_creation_callback.apply_async().get()
assert str(error.value) == f'No account creation data found for task id: {task_uuid}'
def test_balances_callback(activated_account, balances, celery_session_worker):
status_code = 1
with pytest.raises(ValueError) as error:
s_balances_callback = celery.signature(
'cic_ussd.tasks.callback_handler.balances_callback',
[balances, activated_account.blockchain_address, status_code])
s_balances_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
status_code = 0
s_balances_callback = celery.signature(
'cic_ussd.tasks.callback_handler.balances_callback',
[balances, activated_account.blockchain_address, status_code])
s_balances_callback.apply_async().get()
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
key = cache_data_key(identifier, ':cic.balances')
cached_balances = get_cached_data(key)
cached_balances = json.loads(cached_balances)
assert cached_balances == balances[0]
def test_statement_callback(activated_account, mocker, transactions_list):
status_code = 1
with pytest.raises(ValueError) as error:
s_statement_callback = celery.signature(
'cic_ussd.tasks.callback_handler.statement_callback',
[transactions_list, activated_account.blockchain_address, status_code])
s_statement_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
mock_task = mocker.patch('celery.app.task.Task.request')
mock_task.delivery_info = {'routing_key': 'cic-ussd'}
mock_statement_generate = mocker.patch('cic_ussd.tasks.processor.generate_statement.apply_async')
status_code = 0
s_statement_callback = celery.signature(
'cic_ussd.tasks.callback_handler.statement_callback',
[transactions_list, activated_account.blockchain_address, status_code])
s_statement_callback.apply_async().get()
statement_transactions = filter_statement_transactions(transactions_list)
recipient_transaction, sender_transaction = transaction_actors(statement_transactions[0])
mock_statement_generate.assert_called_with(
(activated_account.blockchain_address, sender_transaction), {}, queue='cic-ussd')
def test_transaction_balances_callback(activated_account,
balances,
cache_balances,
cache_person_metadata,
cache_preferences,
load_chain_spec,
mocker,
preferences,
setup_metadata_signer,
setup_metadata_request_handler,
set_locale_files,
transaction_result):
status_code = 1
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
with pytest.raises(ValueError) as error:
s_transaction_balances_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_balances_callback',
[balances, sender_transaction, status_code])
s_transaction_balances_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
mocked_chain = mocker.patch('celery.chain')
mock_task_request = mocker.patch('celery.app.task.Task.request')
mock_task_request.delivery_info = {'routing_key': 'cic-ussd'}
sender_transaction['transaction_type'] = 'transfer'
status_code = 0
s_transaction_balances_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_balances_callback',
[balances, sender_transaction, status_code])
s_transaction_balances_callback.apply_async().get()
mocked_chain.assert_called()
sender_transaction['transaction_type'] = 'tokengift'
status_code = 0
s_transaction_balances_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_balances_callback',
[balances, sender_transaction, status_code])
s_transaction_balances_callback.apply_async().get()
mocked_chain.assert_called()
def test_transaction_callback(load_chain_spec, mock_async_balance_api_query, transaction_result):
status_code = 1
with pytest.raises(ValueError) as error:
s_transaction_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_callback',
[transaction_result, 'transfer', status_code])
s_transaction_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
status_code = 0
s_transaction_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_callback',
[transaction_result, 'transfer', status_code])
s_transaction_callback.apply_async().get()
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
assert mock_async_balance_api_query.get('address') == recipient_transaction.get('blockchain_address') or sender_transaction.get('blockchain_address')
assert mock_async_balance_api_query.get('token_symbol') == recipient_transaction.get('token_symbol') or sender_transaction.get('token_symbol')

View File

@@ -1,203 +0,0 @@
# standard imports
import json
import logging
from datetime import datetime
# third party imports
import celery
import pytest
# local imports
from cic_ussd.db.models.account import Account
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.conversions import from_wei
logg = logging.getLogger()
def test_successful_process_account_creation_callback_task(account_creation_action_data,
celery_session_worker,
init_database,
init_redis_cache,
mocker,
set_account_creation_action_data):
phone_number = account_creation_action_data.get('phone_number')
task_id = account_creation_action_data.get('task_id')
mocked_task_request = mocker.patch('celery.app.task.Task.request')
# WARNING: [THE SETTING OF THE ROOT ID IS A HACK AND SHOULD BE REVIEWED OR IMPROVED]
mocked_task_request.root_id = task_id
user = init_database.query(Account).filter_by(phone_number=phone_number).first()
assert user is None
redis_cache = init_redis_cache
action_data = redis_cache.get(task_id)
action_data = json.loads(action_data)
assert action_data.get('status') == 'PENDING'
status_code = 0
result = '0x6315c185fd23bDbbba058E2a504197915aCC5065'
url = ''
s_process_callback_request = celery.signature(
'cic_ussd.tasks.callback_handler.process_account_creation_callback',
[result, url, status_code]
)
s_process_callback_request.apply_async().get()
user = init_database.query(Account).filter_by(phone_number=phone_number).first()
assert user.blockchain_address == result
action_data = redis_cache.get(task_id)
action_data = json.loads(action_data)
assert action_data.get('status') == 'CREATED'
def test_unsuccessful_process_account_creation_callback_task(init_database,
init_redis_cache,
celery_session_worker):
with pytest.raises(ActionDataNotFoundError) as error:
status_code = 0
result = '0x6315c185fd23bDbbba058E2a504197915aCC5065'
url = ''
s_process_callback_request = celery.signature(
'cic_ussd.tasks.callback_handler.process_account_creation_callback',
[result, url, status_code]
)
result = s_process_callback_request.apply_async()
task_id = result.get()
assert str(error.value) == f'Account creation task: {task_id}, returned unexpected response: {status_code}'
def test_successful_token_gift_incoming_transaction(celery_session_worker,
create_activated_user,
mock_notifier_api,
set_locale_files,
successful_incoming_token_gift_callback):
result = successful_incoming_token_gift_callback.get('RESULT')
param = successful_incoming_token_gift_callback.get('PARAM')
status_code = successful_incoming_token_gift_callback.get('STATUS_CODE')
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
balance = from_wei(result.get('destination_value'))
token_symbol = result.get('token_symbol')
messages = mock_notifier_api
assert messages[0].get('recipient') == create_activated_user.phone_number
assert messages[0].get(
'message') == f'Hello {""} you have been registered on Sarafu Network! Your balance is {balance} {token_symbol}. To use dial *483*46#. For help 0757628885.'
def test_successful_transfer_incoming_transaction(celery_session_worker,
create_valid_tx_sender,
create_valid_tx_recipient,
mock_notifier_api,
set_locale_files,
successful_incoming_transfer_callback):
result = successful_incoming_transfer_callback.get('RESULT')
param = successful_incoming_transfer_callback.get('PARAM')
status_code = successful_incoming_transfer_callback.get('STATUS_CODE')
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
value = result.get('destination_value')
balance = ''
token_symbol = result.get('token_symbol')
sender_first_name = ''
sender_last_name = ''
phone_number = create_valid_tx_sender.phone_number
tx_sender_information = f'{phone_number}, {sender_first_name}, {sender_last_name}'
amount = from_wei(value=value)
timestamp = datetime.now().strftime('%d-%m-%y, %H:%M %p')
messages = mock_notifier_api
assert messages[0].get('recipient') == create_valid_tx_recipient.phone_number
assert messages[0].get(
'message') == f'Successfully received {amount} {token_symbol} from {tx_sender_information} {timestamp}. New balance is {balance} {token_symbol}.'
def test_unsuccessful_incoming_transaction_recipient_not_found(celery_session_worker,
create_valid_tx_sender,
successful_incoming_transfer_callback):
result = successful_incoming_transfer_callback.get('RESULT')
param = successful_incoming_transfer_callback.get('PARAM')
status_code = successful_incoming_transfer_callback.get('STATUS_CODE')
with pytest.raises(ValueError) as error:
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
recipient_blockchain_address = result.get('recipient')
assert str(error.value) == f'Tx for recipient: {recipient_blockchain_address} was received but has no matching user in the system.'
def test_successful_incoming_transaction_sender_not_found(caplog,
celery_session_worker,
create_valid_tx_recipient,
mock_notifier_api,
successful_incoming_transfer_callback):
result = successful_incoming_transfer_callback.get('RESULT')
param = successful_incoming_transfer_callback.get('PARAM')
status_code = successful_incoming_transfer_callback.get('STATUS_CODE')
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
sender_blockchain_address = result.get('sender')
assert 'Balance requires implementation of cic-eth integration with balance.' in caplog.text
# assert f'Tx with sender: {sender_blockchain_address} was received but has no matching user in the system.\n' in caplog.text
def test_unsuccessful_incoming_transaction_invalid_status_code(celery_session_worker,
incoming_transfer_callback_invalid_tx_status_code):
result = incoming_transfer_callback_invalid_tx_status_code.get('RESULT')
param = incoming_transfer_callback_invalid_tx_status_code.get('PARAM')
status_code = incoming_transfer_callback_invalid_tx_status_code.get('STATUS_CODE')
with pytest.raises(ValueError) as error:
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}'
def test_unsuccessful_incoming_transaction_invalid_param(celery_session_worker,
incoming_transfer_callback_invalid_tx_param):
result = incoming_transfer_callback_invalid_tx_param.get('RESULT')
param = incoming_transfer_callback_invalid_tx_param.get('PARAM')
status_code = incoming_transfer_callback_invalid_tx_param.get('STATUS_CODE')
with pytest.raises(ValueError) as error:
s_process_token_gift = celery.signature(
'cic_ussd.tasks.callback_handler.process_incoming_transfer_callback',
[result, param, status_code]
)
s_process_token_gift.apply_async().get()
assert str(error.value) == f'Unexpected transaction: param {status_code}'

View File

@@ -0,0 +1,52 @@
# standard imports
import json
import os
# external imports
import celery
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.metadata import PersonMetadata, PreferencesMetadata
# tests imports
def test_query_person_metadata(activated_account,
celery_session_worker,
init_cache,
person_metadata,
setup_metadata_request_handler,
setup_metadata_signer):
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
metadata_client = PersonMetadata(identifier)
request_mocker.register_uri('GET', metadata_client.url, json=person_metadata, reason='OK', status_code=200)
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata', [activated_account.blockchain_address])
s_query_person_metadata.apply().get()
key = cache_data_key(identifier, ':cic.person')
cached_person_metadata = get_cached_data(key)
cached_person_metadata = json.loads(cached_person_metadata)
assert cached_person_metadata == person_metadata
def test_query_preferences_metadata(activated_account,
celery_session_worker,
init_cache,
preferences,
setup_metadata_request_handler,
setup_metadata_signer):
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
metadata_client = PreferencesMetadata(identifier)
request_mocker.register_uri('GET', metadata_client.url, json=preferences, reason='OK', status_code=200)
query_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_preferences_metadata', [activated_account.blockchain_address])
query_preferences_metadata.apply().get()
key = cache_data_key(identifier, ':cic.preferences')
cached_preferences_metadata = get_cached_data(key)
cached_preferences_metadata = json.loads(cached_preferences_metadata)
assert cached_preferences_metadata == preferences

View File

@@ -0,0 +1,71 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_ussd.account.transaction import from_wei
from cic_ussd.phone_number import Support
from cic_ussd.translation import translation_for
# tests imports
def test_transaction(celery_session_worker,
load_support_phone,
mock_notifier_api,
notification_data,
set_locale_files):
notification_data['transaction_type'] = 'transfer'
amount = from_wei(notification_data.get('token_value'))
balance = notification_data.get('available_balance')
phone_number = notification_data.get('phone_number')
preferred_language = notification_data.get('preferred_language')
token_symbol = notification_data.get('token_symbol')
transaction_account_metadata = notification_data.get('metadata_id')
timestamp = datetime.datetime.now().strftime('%d-%m-%y, %H:%M %p')
s_transaction = celery.signature(
'cic_ussd.tasks.notifications.transaction', [notification_data]
)
s_transaction.apply_async().get()
assert mock_notifier_api.get('recipient') == phone_number
message = translation_for(key='sms.sent_tokens',
phone_number=phone_number,
preferred_language=preferred_language,
amount=amount,
token_symbol=token_symbol,
tx_recipient_information=transaction_account_metadata,
timestamp=timestamp,
balance=balance)
assert mock_notifier_api.get('message') == message
notification_data['role'] = 'recipient'
notification_data['direction_tag'] = 'From'
s_transaction = celery.signature(
'cic_ussd.tasks.notifications.transaction', [notification_data]
)
s_transaction.apply_async().get()
message = translation_for(key='sms.received_tokens',
phone_number=phone_number,
preferred_language=preferred_language,
amount=amount,
token_symbol=token_symbol,
tx_sender_information=transaction_account_metadata,
timestamp=timestamp,
balance=balance)
assert mock_notifier_api.get('message') == message
notification_data['transaction_type'] = 'tokengift'
s_transaction = celery.signature(
'cic_ussd.tasks.notifications.transaction', [notification_data]
)
s_transaction.apply_async().get()
support_phone = Support.phone_number
message = translation_for(key='sms.account_successfully_created',
preferred_language=preferred_language,
balance=balance,
support_phone=support_phone,
token_symbol=token_symbol)
assert mock_notifier_api.get('message') == message

View File

@@ -1,46 +0,0 @@
# third party imports
import celery
import pytest
# local imports
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import SessionNotFoundError
def test_persist_session_to_db_task(
init_database,
create_activated_user,
ussd_session_data,
celery_session_worker,
create_in_redis_ussd_session):
external_session_id = ussd_session_data.get('external_session_id')
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
result = s_persist_session_to_db.apply_async()
result.get()
db_session = init_database.query(UssdSession).filter_by(external_session_id=external_session_id).first()
assert db_session.external_session_id == 'AT974186'
assert db_session.service_code == '*483*46#'
assert db_session.msisdn == '+25498765432'
assert db_session.user_input == '1'
assert db_session.state == 'initial_language_selection'
assert db_session.session_data is None
assert db_session.version == 2
assert UssdSession.have_session_for_phone(create_activated_user.phone_number)
def test_session_not_found_error(
celery_session_worker,
create_in_redis_ussd_session):
with pytest.raises(SessionNotFoundError) as error:
external_session_id = 'SomeRandomValue'
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
result = s_persist_session_to_db.apply_async()
result.get()
assert str(error.value) == "Session does not exist!"

View File

@@ -0,0 +1,75 @@
# standard imports
import json
# external imports
import celery
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.cache import cache_data_key, get_cached_data
# test imports
def test_generate_statement(activated_account,
celery_session_worker,
cache_preferences,
mocker,
transaction_result):
mock_task = mocker.patch('celery.app.task.Task.request')
mock_task.delivery_info = {'routing_key': 'cic-ussd'}
mock_chain = mocker.patch('celery.chain')
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
s_generate_statement = celery.signature(
'cic_ussd.tasks.processor.generate_statement', [activated_account.blockchain_address, sender_transaction]
)
result = s_generate_statement.apply_async().get()
mock_chain.assert_called_once()
def test_cache_statement(activated_account,
cache_person_metadata,
celery_session_worker,
init_database,
preferences,
transaction_result):
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
key = cache_data_key(identifier, ':cic.statement')
cached_statement = get_cached_data(key)
assert cached_statement is None
s_parse_transaction = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [preferences, sender_transaction])
result = s_parse_transaction.apply_async().get()
s_cache_statement = celery.signature(
'cic_ussd.tasks.processor.cache_statement', [result, activated_account.blockchain_address]
)
s_cache_statement.apply_async().get()
cached_statement = get_cached_data(key)
cached_statement = json.loads(cached_statement)
assert len(cached_statement) == 1
s_cache_statement = celery.signature(
'cic_ussd.tasks.processor.cache_statement', [result, activated_account.blockchain_address]
)
s_cache_statement.apply_async().get()
cached_statement = get_cached_data(key)
cached_statement = json.loads(cached_statement)
assert len(cached_statement) == 2
def test_parse_transaction(activated_account,
cache_person_metadata,
celery_session_worker,
init_database,
preferences,
transaction_result):
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
assert sender_transaction.get('metadata_id') is None
assert sender_transaction.get('phone_number') is None
s_parse_transaction = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [preferences, sender_transaction])
result = s_parse_transaction.apply_async().get()
assert result.get('metadata_id') == activated_account.standard_metadata_id()
assert result.get('phone_number') == activated_account.phone_number

View File

@@ -0,0 +1,33 @@
# standard imports
# external imports
import celery
import pytest
# local imports
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import SessionNotFoundError
# tests imports
def test_persist_session_to_db(cached_ussd_session, celery_session_worker, init_cache, init_database):
external_session_id = cached_ussd_session.external_session_id
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db', [external_session_id])
s_persist_session_to_db.apply_async().get()
ussd_session = init_database.query(UssdSession).filter_by(external_session_id=external_session_id).first()
assert ussd_session.external_session_id == cached_ussd_session.external_session_id
assert ussd_session.service_code == cached_ussd_session.service_code
assert ussd_session.msisdn == cached_ussd_session.msisdn
assert ussd_session.user_input == cached_ussd_session.user_input
assert ussd_session.state == cached_ussd_session.state
assert ussd_session.data is None
assert ussd_session.version == cached_ussd_session.version
assert UssdSession.has_record_for_phone_number(ussd_session.msisdn, init_database)
with pytest.raises(SessionNotFoundError) as error:
external_session_id = 'SomeRandomValue'
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db', [external_session_id])
result = s_persist_session_to_db.apply_async().get()
assert str(error.value) == "Session does not exist!"

View File

@@ -1,20 +0,0 @@
# standard imports
# third-party imports
# local imports
from cic_ussd.balance import BalanceManager
from cic_ussd.chain import Chain
def test_balance_manager(create_valid_tx_recipient, load_config, mocker, setup_chain_spec):
chain_str = Chain.spec.__str__()
balance_manager = BalanceManager(
address=create_valid_tx_recipient.blockchain_address,
chain_str=chain_str,
token_symbol='SRF'
)
balance_manager.get_balances = mocker.MagicMock()
balance_manager.get_balances()
balance_manager.get_balances.assert_called_once()

View File

@@ -0,0 +1,33 @@
# standard imports
import hashlib
import json
# external imports
# local imports
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
# test imports
def test_cache_data(init_cache):
identifier = 'some_key'.encode()
key = cache_data_key(identifier, ':testing')
assert get_cached_data(key) is None
cache_data(key, json.dumps('some_value'))
assert get_cached_data(key) is not None
def test_cache_data_key():
identifier = 'some_key'.encode()
key = cache_data_key(identifier, ':testing')
hash_object = hashlib.new("sha256")
hash_object.update(identifier)
hash_object.update(':testing'.encode(encoding="utf-8"))
assert hash_object.digest().hex() == key
def test_get_cached_data(cached_ussd_session):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('msisdn') == cached_ussd_session.msisdn

View File

@@ -1,6 +1,6 @@
# standard imports
# third-party imports
# external imports
import pytest
# local imports
@@ -14,19 +14,15 @@ from cic_ussd.notifications import Notifier
def test_send_sms_notification(celery_session_worker,
expected_message,
key,
mock_notifier_api,
preferred_language,
recipient,
set_locale_files,
mock_notifier_api):
set_locale_files):
notifier = Notifier()
notifier.queue = None
notifier.send_sms_notification(key=key, phone_number=recipient, preferred_language=preferred_language)
messages = mock_notifier_api
assert messages[0].get('message') == expected_message
assert messages[0].get('recipient') == recipient
assert mock_notifier_api.get('message') == expected_message
assert mock_notifier_api.get('recipient') == recipient

View File

@@ -1,243 +0,0 @@
# standard imports
import json
import uuid
# third party imports
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.operations import (add_tasks_to_tracker,
create_ussd_session,
create_or_update_session,
define_response_with_content,
define_multilingual_responses,
get_account_status,
get_latest_input,
initiate_account_creation_request,
process_current_menu,
process_menu_interaction_requests,
cache_account_creation_task_id,
reset_pin,
update_ussd_session,
save_to_in_memory_ussd_session_data)
from cic_ussd.phone_number import get_user_by_phone_number,process_phone_number
from cic_ussd.transactions import truncate
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
def test_add_tasks_to_tracker(init_database):
task_uuid = '31e85315-feee-4b6d-995e-223569082cc4'
session = init_database
assert len(session.query(TaskTracker).all()) == 0
add_tasks_to_tracker(task_uuid=task_uuid)
task_in_tracker = session.query(TaskTracker).filter_by(task_uuid=task_uuid).first()
assert task_in_tracker.id == 1
assert task_in_tracker.task_uuid == task_uuid
def test_create_ussd_session(create_in_redis_ussd_session, ussd_session_data):
external_session_id = ussd_session_data.get('external_session_id')
ussd_session = create_ussd_session(
external_session_id=external_session_id,
service_code=ussd_session_data.get('service_code'),
phone=ussd_session_data.get('msisdn'),
user_input=ussd_session_data.get('user_input'),
current_menu=ussd_session_data.get('state')
)
in_memory_ussd_session = create_in_redis_ussd_session.get(external_session_id)
assert json.loads(in_memory_ussd_session).get('external_session_id') == ussd_session.external_session_id
def test_create_or_update_session(init_database, create_in_redis_ussd_session, ussd_session_data):
external_session_id = ussd_session_data.get('external_session_id')
ussd_session = create_or_update_session(external_session_id=external_session_id,
service_code=ussd_session_data.get('service_code'),
phone=ussd_session_data.get('msisdn'),
user_input=ussd_session_data.get('user_input'),
current_menu=ussd_session_data.get('state'))
in_memory_ussd_session = create_in_redis_ussd_session.get(external_session_id)
assert json.loads(in_memory_ussd_session).get('external_session_id') == ussd_session.external_session_id
@pytest.mark.parametrize('headers, response, expected_result',[
([('Content-Type', 'text/plain')], 'some-text', (b'some-text', [('Content-Type', 'text/plain'), ('Content-Length', '9')])),
([('Content-Type', 'text/plain'), ('Content-Length', '0')], 'some-text', (b'some-text', [('Content-Type', 'text/plain'), ('Content-Length', '9')]))
])
def test_define_response_with_content(headers, response, expected_result):
response_bytes, headers = define_response_with_content(headers=headers, response=response)
assert response_bytes, headers == expected_result
def test_define_multilingual_responses(load_ussd_menu, set_locale_files):
response = define_multilingual_responses(
key='ussd.kenya.account_creation_prompt', locales=['en', 'sw'], prefix='END')
assert response == "END Your account is being created. You will receive an SMS when your account is ready.\nAkaunti yako ya Sarafu inatayarishwa. Utapokea ujumbe wa SMS akaunti yako ikiwa tayari.\n"
def test_get_account_status(create_pending_user):
user = create_pending_user
assert get_account_status(user.phone_number) == 'PENDING'
@pytest.mark.parametrize('user_input, expected_value', [
('1*9*6*7', '7'),
('1', '1'),
('', '')
])
def test_get_latest_input(user_input, expected_value):
assert get_latest_input(user_input=user_input) == expected_value
def test_initiate_account_creation_request(account_creation_action_data,
create_in_redis_ussd_session,
init_database,
load_config,
load_ussd_menu,
mocker,
setup_chain_spec,
set_locale_files,
ussd_session_data):
external_session_id = ussd_session_data.get('external_session_id')
phone_number = account_creation_action_data.get('phone_number')
task_id = account_creation_action_data.get('task_id')
class Callable:
id = task_id
mocker.patch('cic_eth.api.api_task.Api.create_account', return_value=Callable)
mocked_cache_function = mocker.patch('cic_ussd.operations.cache_account_creation_task_id')
mocked_cache_function(phone_number, task_id)
chain_str = Chain.spec.__str__()
response = initiate_account_creation_request(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=ussd_session_data.get('msisdn'),
service_code=ussd_session_data.get('service_code'),
user_input=ussd_session_data.get('user_input'))
in_memory_ussd_session = InMemoryUssdSession.redis_cache.get(external_session_id)
# check that ussd session was created
assert json.loads(in_memory_ussd_session).get('external_session_id') == external_session_id
assert response == "END Your account is being created. You will receive an SMS when your account is ready.\nAkaunti yako ya Sarafu inatayarishwa. Utapokea ujumbe wa SMS akaunti yako ikiwa tayari.\n"
def test_reset_pin(create_pin_blocked_user):
user = create_pin_blocked_user
assert user.get_account_status() == 'LOCKED'
reset_pin(user.phone_number)
assert user.get_account_status() == 'RESET'
def test_update_ussd_session(create_in_redis_ussd_session, load_ussd_menu, ussd_session_data):
external_session_id = ussd_session_data.get('external_session_id')
ussd_session = create_ussd_session(external_session_id=external_session_id,
service_code=ussd_session_data.get('service_code'),
phone=ussd_session_data.get('msisdn'),
user_input=ussd_session_data.get('user_input'),
current_menu=ussd_session_data.get('state')
)
assert ussd_session.user_input == ussd_session_data.get('user_input')
assert ussd_session.state == ussd_session_data.get('state')
ussd_session = update_ussd_session(ussd_session=ussd_session, user_input='1*2', current_menu='initial_pin_entry')
assert ussd_session.user_input == '1*2'
assert ussd_session.state == 'initial_pin_entry'
def test_process_current_menu(create_activated_user, create_in_db_ussd_session):
ussd_session = create_in_db_ussd_session
current_menu = process_current_menu(ussd_session=ussd_session, user=create_activated_user, user_input="")
assert current_menu == UssdMenu.find_by_name(name='exit_invalid_input')
current_menu = process_current_menu(ussd_session=None, user=create_activated_user, user_input="1*0000")
assert current_menu == UssdMenu.find_by_name(name='start')
def test_cache_account_creation_task_id(init_redis_cache):
phone_number = '+25412345678'
task_id = str(uuid.uuid4())
cache_account_creation_task_id(phone_number=phone_number, task_id=task_id)
redis_cache = init_redis_cache
action_data = redis_cache.get(task_id)
action_data = json.loads(action_data)
assert action_data.get('phone_number') == phone_number
assert action_data.get('sms_notification_sent') is False
assert action_data.get('status') == 'PENDING'
assert action_data.get('task_id') == task_id
def test_save_to_in_memory_ussd_session_data(celery_session_worker,
create_in_db_ussd_session,
create_in_redis_ussd_session,
init_database):
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
session_data = {
'some_test_key': 'some_test_value'
}
save_to_in_memory_ussd_session_data(
queue='cic-ussd',
session_data=session_data,
ussd_session=create_in_db_ussd_session.to_json()
)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == session_data
@pytest.mark.parametrize("external_session_id, phone_number, expected_response", [
("AT123456789", "+254700000000", "END Your account is being created. You will receive an SMS when your account is ready.\nAkaunti yako ya Sarafu inatayarishwa. Utapokea ujumbe wa SMS akaunti yako ikiwa tayari.\n"),
("AT974186", "+25498765432", "CON Please enter a PIN to manage your account.\n0. Back")
])
def test_process_menu_interaction_requests(external_session_id,
phone_number,
expected_response,
load_ussd_menu,
load_data_into_state_machine,
load_config,
setup_chain_spec,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session):
chain_str = Chain.spec.__str__()
response = process_menu_interaction_requests(
chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
queue='cic-ussd',
service_code=load_config.get('APP_SERVICE_CODE'),
user_input='1'
)
assert response == expected_response
@pytest.mark.parametrize("phone_number, region, expected_result", [
("0712345678", "KE", "+254712345678"),
("+254787654321", "KE", "+254787654321")
])
def test_process_phone_number(expected_result, phone_number, region):
processed_phone_number = process_phone_number(phone_number=phone_number, region=region)
assert processed_phone_number == expected_result
def test_get_user_by_phone_number(create_activated_user):
known_phone_number = create_activated_user.phone_number
user = get_user_by_phone_number(phone_number=known_phone_number)
assert user is not None
assert create_activated_user.blockchain_address == user.blockchain_address
unknown_phone_number = '+254700000000'
user = get_user_by_phone_number(phone_number=unknown_phone_number)
assert user is None

View File

@@ -0,0 +1,18 @@
# standard imports
# external imports
import pytest
# local imports
from cic_ussd.phone_number import process_phone_number
# tests imports
@pytest.mark.parametrize("phone_number, region, expected_result", [
("0712345678", "KE", "+254712345678"),
("+254787654321", "KE", "+254787654321")
])
def test_process_phone_number(expected_result, phone_number, region):
processed_phone_number = process_phone_number(phone_number=phone_number, region=region)
assert processed_phone_number == expected_result

View File

@@ -1,130 +0,0 @@
# local imports
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.processor import (custom_display_text,
next_state,
process_request,
process_pin_authorization,
process_transaction_pin_authorization,
process_exit_insufficient_balance,
process_exit_successful_transaction)
def test_process_pin_authorization(create_activated_user,
load_ussd_menu,
set_locale_files):
ussd_menu = UssdMenu.find_by_name(name='standard_pin_authorization')
response = process_pin_authorization(
display_key=ussd_menu.get('display_key'),
user=create_activated_user
)
assert response == 'CON Please enter your PIN.\n0. Back'
user_with_one_failed_pin_attempt = create_activated_user
user_with_one_failed_pin_attempt.failed_pin_attempts = 1
alt_response = process_pin_authorization(
display_key=ussd_menu.get('display_key'),
user=user_with_one_failed_pin_attempt,
)
assert alt_response == 'CON Please enter your PIN. You have 2 attempts remaining.\n0. Back'
def test_process_transaction_pin_authorization(create_activated_user,
create_in_db_ussd_session,
load_ussd_menu,
set_locale_files):
session_data = {
'recipient_phone_number': '+254700000000',
}
ussd_session = create_in_db_ussd_session.to_json()
ussd_session['session_data'] = session_data
ussd_session['user_input'] = '1*0700000000*120'
ussd_menu = UssdMenu.find_by_name(name='transaction_pin_authorization')
response = process_transaction_pin_authorization(
display_key=ussd_menu.get('display_key'),
user=create_activated_user,
ussd_session=ussd_session
)
assert response == 'CON +254700000000 will receive 120.00 SRF from +25498765432.\nPlease enter your PIN to confirm.\n0. Back'
def test_process_request_for_pending_user(load_ussd_menu, create_pending_user):
expected_menu = process_request(user_input="", user=create_pending_user)
assert expected_menu == UssdMenu.find_by_name(name='initial_language_selection')
def test_processor_request_for_activated_user(load_ussd_menu, create_activated_user):
expected_menu = process_request(user_input="", user=create_activated_user)
assert expected_menu == UssdMenu.find_by_name(name="start")
def test_next_state(load_data_into_state_machine, load_ussd_menu, create_in_db_ussd_session, create_pending_user):
assert create_in_db_ussd_session.state == "initial_language_selection"
successive_state = next_state(
ussd_session=create_in_db_ussd_session.to_json(),
user=create_pending_user,
user_input="1"
)
assert successive_state == "initial_pin_entry"
def test_custom_display_text(create_activated_user,
get_in_redis_ussd_session,
load_ussd_menu,
set_locale_files):
ussd_session = get_in_redis_ussd_session
user = create_activated_user
ussd_menu = UssdMenu.find_by_name(name='exit_invalid_request')
english_translation = custom_display_text(
display_key=ussd_menu.get('display_key'),
menu_name=ussd_menu.get('name'),
user=user,
ussd_session=ussd_session
)
user.preferred_language = 'sw'
swahili_translation = custom_display_text(
display_key=ussd_menu.get('display_key'),
menu_name=ussd_menu.get('name'),
user=user,
ussd_session=ussd_session
)
assert swahili_translation == 'END Chaguo si sahihi.'
assert english_translation == 'END Invalid request.'
def test_process_exit_insufficient_balance(
create_valid_tx_recipient,
load_ussd_menu,
mock_balance,
set_locale_files,
ussd_session_data):
mock_balance(50)
ussd_session_data['user_input'] = f'1*{create_valid_tx_recipient.phone_number}*75'
ussd_session_data['session_data'] = {'recipient_phone_number': create_valid_tx_recipient.phone_number}
ussd_session_data['display_key'] = 'exit_insufficient_balance'
ussd_menu = UssdMenu.find_by_name(name='exit_insufficient_balance')
response = process_exit_insufficient_balance(
display_key=ussd_menu.get('display_key'),
user=create_valid_tx_recipient,
ussd_session=ussd_session_data
)
assert response == 'CON Payment of 75.00 SRF to +25498765432 has failed due to insufficent balance.\nYour Sarafu-Network balances is: 50.00\n00. Back\n99. Exit'
def test_process_exit_successful_transaction(
create_valid_tx_recipient,
create_valid_tx_sender,
load_ussd_menu,
set_locale_files,
ussd_session_data):
ussd_session_data['session_data'] = {
'recipient_phone_number': create_valid_tx_recipient.phone_number,
'transaction_amount': 75
}
ussd_session_data['display_key'] = 'exit_successful_transaction'
ussd_menu = UssdMenu.find_by_name(name='exit_successful_transaction')
response = process_exit_successful_transaction(
display_key=ussd_menu.get('display_key'),
user=create_valid_tx_sender,
ussd_session=ussd_session_data
)
assert response == 'CON Your request has been sent. +25498765432 will receive 75.00 SRF from +25498765433.\n00. Back\n99. Exit'

View File

@@ -1,65 +0,0 @@
# standard imports
import json
# local imports
from cic_ussd.db.models.account import Account
from cic_ussd.requests import (get_query_parameters,
get_request_endpoint,
get_request_method,
process_pin_reset_requests,
process_locked_accounts_requests)
def test_get_query_parameters(get_request_with_params_env):
param = get_query_parameters(env=get_request_with_params_env, query_name='phone')
assert param == '0700000000'
def test_get_request_endpoint(valid_locked_accounts_env):
param = get_request_endpoint(env=valid_locked_accounts_env)
assert param == '/accounts/locked/10/10'
def test_get_request_method(valid_locked_accounts_env):
param = get_request_method(env=valid_locked_accounts_env)
assert param == 'GET'
def test_process_pin_reset_requests(uwsgi_env, create_pin_blocked_user):
env = uwsgi_env
env['REQUEST_METHOD'] = 'GET'
message, status = process_pin_reset_requests(env=env, phone_number='070000000')
assert message == 'No user matching 070000000 was found.'
assert status == '404 Not Found'
env['REQUEST_METHOD'] = 'GET'
message, status = process_pin_reset_requests(env=env, phone_number=create_pin_blocked_user.phone_number)
assert message == '{"status": "LOCKED"}'
assert status == '200 OK'
env['REQUEST_METHOD'] = 'GET'
message, status = process_pin_reset_requests(env=env, phone_number=create_pin_blocked_user.phone_number)
assert message == '{"status": "LOCKED"}'
assert status == '200 OK'
env['REQUEST_METHOD'] = 'PUT'
message, status = process_pin_reset_requests(env=env, phone_number=create_pin_blocked_user.phone_number)
assert message == f'Pin reset for user {create_pin_blocked_user.phone_number} is successful!'
assert status == '200 OK'
assert create_pin_blocked_user.get_account_status() == 'RESET'
def test_process_locked_accounts_requests(create_locked_accounts, valid_locked_accounts_env):
response, message = process_locked_accounts_requests(env=valid_locked_accounts_env)
assert message == '200 OK'
locked_account_addresses = json.loads(response)
assert len(locked_account_addresses) == 10
# check that blockchain addresses are ordered by most recently accessed
user_1 = Account.session.query(Account).filter_by(blockchain_address=locked_account_addresses[2]).first()
user_2 = Account.session.query(Account).filter_by(blockchain_address=locked_account_addresses[7]).first()
assert user_1.updated > user_2.updated

View File

@@ -1,35 +0,0 @@
# standard imports
# third-party imports
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.transactions import OutgoingTransactionProcessor, truncate
def test_outgoing_transaction_processor(load_config,
create_valid_tx_recipient,
create_valid_tx_sender,
mock_outgoing_transactions):
chain_str = Chain.spec.__str__()
outgoing_tx_processor = OutgoingTransactionProcessor(
chain_str=chain_str,
from_address=create_valid_tx_sender.blockchain_address,
to_address=create_valid_tx_recipient.blockchain_address
)
outgoing_tx_processor.process_outgoing_transfer_transaction(
amount=120,
token_symbol='SRF'
)
assert mock_outgoing_transactions[0].get('amount') == 120.0
assert mock_outgoing_transactions[0].get('token_symbol') == 'SRF'
@pytest.mark.parametrize("decimals, value, expected_result",[
(3, 1234.32875, 1234.328),
(2, 98.998, 98.99)
])
def test_truncate(decimals, value, expected_result):
assert truncate(value=value, decimals=decimals).__float__() == expected_result

View File

@@ -1,6 +1,12 @@
# standard imports
# external imports
# local imports
from cic_ussd.translation import translation_for
# tests imports
def test_translation_for(set_locale_files):
english_translation = translation_for(

View File

@@ -1,14 +1,14 @@
# third party imports
# standard imports
# external imports
import pytest
# local imports
from cic_ussd.validator import (check_ip,
check_request_content_length,
check_service_code,
check_known_user,
check_request_method,
validate_phone_number,
validate_response_type)
is_valid_response,
validate_phone_number)
def test_check_ip(load_config, uwsgi_env):
@@ -19,15 +19,6 @@ def test_check_request_content_length(load_config, uwsgi_env):
assert check_request_content_length(config=load_config, env=uwsgi_env) is True
def test_check_service_code(load_config):
assert check_service_code(code='*483*46#', config=load_config) is True
def test_check_known_user(create_pending_user):
user = create_pending_user
assert check_known_user(phone=user.phone_number) is True
def test_check_request_method(uwsgi_env):
assert check_request_method(env=uwsgi_env) is True
@@ -49,5 +40,5 @@ def test_validate_phone_number(phone, expected_value):
('BIO testing', False)
])
def test_validate_response_type(response, expected_value):
assert validate_response_type(response) is expected_value
assert is_valid_response(response) is expected_value