Minor refactors:
- Renames s_assemble to s_brief - Link s_local to s_brief
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
import pytest
|
||||
|
||||
|
||||
def test_single_head_revision(alembic_runner):
|
||||
heads = alembic_runner.heads
|
||||
head_count = len(heads)
|
||||
assert head_count == 1
|
||||
|
||||
|
||||
def test_upgrade(alembic_runner):
|
||||
try:
|
||||
alembic_runner.migrate_up_to("head")
|
||||
except RuntimeError:
|
||||
pytest.fail('Failed to upgrade to the head revision.')
|
||||
|
||||
|
||||
def test_up_down_consistency(alembic_runner):
|
||||
try:
|
||||
for revision in alembic_runner.history.revisions:
|
||||
alembic_runner.migrate_up_to(revision)
|
||||
except RuntimeError:
|
||||
pytest.fail('Failed to upgrade through each revision individually.')
|
||||
|
||||
try:
|
||||
for revision in reversed(alembic_runner.history.revisions):
|
||||
alembic_runner.migrate_down_to(revision)
|
||||
except RuntimeError:
|
||||
pytest.fail('Failed to downgrade through each revision individually.')
|
||||
14
apps/cic-ussd/tests/cic_ussd/db/models/test_task_tracker.py
Normal file
14
apps/cic-ussd/tests/cic_ussd/db/models/test_task_tracker.py
Normal file
@@ -0,0 +1,14 @@
|
||||
# local imports
|
||||
from cic_ussd.db.models.task_tracker import TaskTracker
|
||||
|
||||
|
||||
def test_task_tracker(init_database):
|
||||
task_uuid = '31e85315-feee-4b6d-995e-223569082cc4'
|
||||
task_in_tracker = TaskTracker(task_uuid=task_uuid)
|
||||
|
||||
session = init_database
|
||||
session.add(task_in_tracker)
|
||||
session.commit()
|
||||
|
||||
queried_task = session.query(TaskTracker).get(1)
|
||||
assert queried_task.task_uuid == task_uuid
|
||||
40
apps/cic-ussd/tests/cic_ussd/db/models/test_user.py
Normal file
40
apps/cic-ussd/tests/cic_ussd/db/models/test_user.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Tests the persistence of the user record and associated functions to the user object"""
|
||||
|
||||
# standard imports
|
||||
import pytest
|
||||
|
||||
# platform imports
|
||||
from cic_ussd.db.models.user import User
|
||||
|
||||
|
||||
def test_user(init_database, set_fernet_key):
|
||||
user = User(blockchain_address='0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51',
|
||||
phone_number='+254700000000')
|
||||
user.create_password('0000')
|
||||
|
||||
session = User.session
|
||||
session.add(user)
|
||||
session.commit()
|
||||
|
||||
queried_user = session.query(User).get(1)
|
||||
assert queried_user.blockchain_address == '0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51'
|
||||
assert queried_user.phone_number == '+254700000000'
|
||||
assert queried_user.failed_pin_attempts == 0
|
||||
assert queried_user.verify_password('0000') is True
|
||||
|
||||
|
||||
def test_user_state_transition(create_pending_user):
|
||||
user = create_pending_user
|
||||
session = User.session
|
||||
|
||||
assert user.get_account_status() == 'PENDING'
|
||||
user.activate_account()
|
||||
assert user.get_account_status() == 'ACTIVE'
|
||||
user.failed_pin_attempts = 3
|
||||
assert user.get_account_status() == 'LOCKED'
|
||||
user.reset_account_pin()
|
||||
assert user.get_account_status() == 'RESET'
|
||||
user.activate_account()
|
||||
assert user.get_account_status() == 'ACTIVE'
|
||||
session.add(user)
|
||||
session.commit()
|
||||
54
apps/cic-ussd/tests/cic_ussd/db/models/test_ussd_session.py
Normal file
54
apps/cic-ussd/tests/cic_ussd/db/models/test_ussd_session.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# third party imports
|
||||
import pytest
|
||||
|
||||
# local imports
|
||||
from cic_ussd.db.models.ussd_session import UssdSession
|
||||
from cic_ussd.error import VersionTooLowError
|
||||
|
||||
|
||||
def test_ussd_session(init_database, create_in_redis_ussd_session, create_activated_user):
|
||||
session = init_database
|
||||
|
||||
ussd_session = UssdSession(
|
||||
external_session_id='AT65423',
|
||||
service_code='*123#',
|
||||
msisdn=create_activated_user.phone_number,
|
||||
user_input='1',
|
||||
state='start',
|
||||
session_data={},
|
||||
version=1,
|
||||
)
|
||||
|
||||
session.add(ussd_session)
|
||||
session.commit()
|
||||
|
||||
ussd_session.set_data(key='foo', session=init_database, value='bar')
|
||||
|
||||
assert ussd_session.get_data('foo') == 'bar'
|
||||
ussd_session.update(
|
||||
session=init_database,
|
||||
user_input='3',
|
||||
state='next',
|
||||
version=2
|
||||
)
|
||||
assert ussd_session.version == 2
|
||||
session.add(ussd_session)
|
||||
session.commit()
|
||||
|
||||
assert UssdSession.have_session_for_phone(create_activated_user.phone_number) is True
|
||||
|
||||
|
||||
def test_version_too_low_error(init_database, create_in_redis_ussd_session, create_activated_user):
|
||||
with pytest.raises(VersionTooLowError) as e:
|
||||
session = UssdSession(
|
||||
external_session_id='AT38745',
|
||||
service_code='*123#',
|
||||
msisdn=create_activated_user.phone_number,
|
||||
user_input='1',
|
||||
state='start',
|
||||
session_data={},
|
||||
version=3,
|
||||
)
|
||||
assert session.check_version(1)
|
||||
assert session.check_version(3)
|
||||
assert str(e.value) == 'New session version number is not greater than last saved version!'
|
||||
30
apps/cic-ussd/tests/cic_ussd/db/test_db.py
Normal file
30
apps/cic-ussd/tests/cic_ussd/db/test_db.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# standard imports
|
||||
import os
|
||||
|
||||
# third-party imports
|
||||
|
||||
# local imports
|
||||
from cic_ussd.db import dsn_from_config
|
||||
|
||||
|
||||
def test_dsn_from_config(load_config):
|
||||
"""
|
||||
"""
|
||||
# test dsn for sqlite engine
|
||||
dsn = dsn_from_config(load_config)
|
||||
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
|
||||
assert dsn == f'{scheme}:///{load_config.get("DATABASE_NAME")}'
|
||||
|
||||
# test dsn for other db formats
|
||||
overrides = {
|
||||
'DATABASE_PASSWORD': 'password',
|
||||
'DATABASE_DRIVER': 'psycopg2',
|
||||
'DATABASE_ENGINE': 'postgresql'
|
||||
}
|
||||
load_config.dict_override(dct=overrides, dct_description='Override values to test different db formats.')
|
||||
|
||||
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
|
||||
|
||||
dsn = dsn_from_config(load_config)
|
||||
assert dsn == f"{scheme}://{load_config.get('DATABASE_USER')}:{load_config.get('DATABASE_PASSWORD')}@{load_config.get('DATABASE_HOST')}:{load_config.get('DATABASE_PORT')}/{load_config.get('DATABASE_NAME')}"
|
||||
|
||||
Reference in New Issue
Block a user