Remove submodule cic ussd

This commit is contained in:
2021-02-06 15:13:47 +00:00
parent 8680d57a67
commit f386625844
221 changed files with 10030 additions and 4 deletions

View File

@@ -0,0 +1,21 @@
# local imports
from cic_ussd.state_machine.logic.menu import (menu_one_selected,
menu_two_selected,
menu_three_selected,
menu_four_selected)
def test_menu_selection(create_pending_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
assert menu_one_selected(('1', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_one_selected(('x', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_two_selected(('2', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_two_selected(('1', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_three_selected(('3', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_three_selected(('4', serialized_in_db_ussd_session, create_pending_user)) is False
assert menu_four_selected(('4', serialized_in_db_ussd_session, create_pending_user)) is True
assert menu_four_selected(('d', serialized_in_db_ussd_session, create_pending_user)) is False

View File

@@ -0,0 +1,94 @@
# standards imports
import json
# third party imports
import pytest
# local imports
from cic_ussd.encoder import check_password_hash, create_password_hash
from cic_ussd.state_machine.logic.pin import (complete_pin_change,
is_valid_pin,
is_valid_new_pin,
is_authorized_pin,
is_blocked_pin,
pins_match,
save_initial_pin_to_session_data)
def test_complete_pin_change(init_database, create_pending_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('1212', serialized_in_db_ussd_session, create_pending_user)
assert create_pending_user.password_hash is None
create_in_db_ussd_session.set_data(key='initial_pin', session=init_database, value=create_password_hash('1212'))
complete_pin_change(state_machine_data)
assert create_pending_user.password_hash is not None
assert create_pending_user.verify_password(password='1212') is True
@pytest.mark.parametrize('user_input, expected', [
('4562', True),
('jksu', False),
('ij45', False),
])
def test_is_valid_pin(create_pending_user, create_in_db_ussd_session, user_input, expected):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
assert is_valid_pin(state_machine_data) is expected
@pytest.mark.parametrize('user_input, expected', [
('1212', True),
('0000', False)
])
def test_pins_match(init_database, create_pending_user, create_in_db_ussd_session, user_input, expected):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
create_in_db_ussd_session.set_data(key='initial_pin', session=init_database, value=create_password_hash(user_input))
assert pins_match(state_machine_data) is True
def test_save_initial_pin_to_session_data(create_pending_user,
create_in_redis_ussd_session,
create_in_db_ussd_session,
celery_session_worker):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('1212', serialized_in_db_ussd_session, create_pending_user)
save_initial_pin_to_session_data(state_machine_data)
external_session_id = create_in_db_ussd_session.external_session_id
in_memory_ussd_session = create_in_redis_ussd_session.get(external_session_id)
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert check_password_hash(
password='1212', hashed_password=in_memory_ussd_session.get('session_data')['initial_pin'])
@pytest.mark.parametrize('user_input, expected_result', [
('1212', False),
('0000', True)
])
def test_is_authorized_pin(create_activated_user, create_in_db_ussd_session, expected_result, user_input):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
assert is_authorized_pin(state_machine_data=state_machine_data) is expected_result
def test_is_not_blocked_pin(create_activated_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
assert is_blocked_pin(state_machine_data=state_machine_data) is False
def test_is_blocked_pin(create_pin_blocked_user, create_in_db_ussd_session):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
alt_state_machine_data = ('user_input', serialized_in_db_ussd_session, create_pin_blocked_user)
assert is_blocked_pin(state_machine_data=alt_state_machine_data) is True
@pytest.mark.parametrize('user_input, expected_result', [
('1212', True),
('0000', False)
])
def test_is_valid_new_pin(create_activated_user, create_in_db_ussd_session, expected_result, user_input):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
assert is_valid_new_pin(state_machine_data=state_machine_data) is expected_result

View File

@@ -0,0 +1,36 @@
# standard imports
# third-party imports
import pytest
# local imports
from cic_ussd.state_machine.logic.sms import (send_terms_to_user_if_required,
process_mini_statement_request,
upsell_unregistered_recipient)
def test_send_terms_to_user_if_required(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
send_terms_to_user_if_required(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text
def test_process_mini_statement_request(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
process_mini_statement_request(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text
def test_upsell_unregistered_recipient(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
upsell_unregistered_recipient(state_machine_data=state_machine_data)
assert 'Requires integration to cic-notify.' in caplog.text

View File

@@ -0,0 +1,112 @@
# standard imports
import json
# third-party imports
import pytest
# local imports
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.state_machine.logic.transaction import (has_sufficient_balance,
is_valid_recipient,
is_valid_transaction_amount,
process_transaction_request,
save_recipient_phone_to_session_data,
save_transaction_amount_to_session_data)
from cic_ussd.redis import InMemoryStore
@pytest.mark.parametrize("amount, expected_result", [
('50', True),
('', False)
])
def test_is_valid_transaction_amount(create_activated_user, create_in_db_ussd_session, amount, expected_result):
state_machine_data = (amount, create_in_db_ussd_session, create_activated_user)
validity = is_valid_transaction_amount(state_machine_data=state_machine_data)
assert validity == expected_result
def test_save_recipient_phone_to_session_data(create_activated_user,
create_in_db_ussd_session,
celery_session_worker,
create_in_redis_ussd_session,
init_database):
phone_number = '+254712345678'
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (phone_number, serialized_in_db_ussd_session, create_activated_user)
save_recipient_phone_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')['recipient_phone_number'] == phone_number
def test_save_transaction_amount_to_session_data(create_activated_user,
create_in_db_ussd_session,
celery_session_worker,
create_in_redis_ussd_session,
init_database):
transaction_amount = '100'
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (transaction_amount, serialized_in_db_ussd_session, create_activated_user)
save_transaction_amount_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')['transaction_amount'] == transaction_amount
@pytest.mark.parametrize("test_value, expected_result", [
('45', True),
('75', False)
])
def test_has_sufficient_balance(mock_balance,
create_in_db_ussd_session,
create_valid_tx_sender,
expected_result,
test_value):
mock_balance(60)
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (test_value, serialized_in_db_ussd_session, create_valid_tx_sender)
result = has_sufficient_balance(state_machine_data=state_machine_data)
assert result == expected_result
@pytest.mark.parametrize("test_value, expected_result", [
('+25498765432', True),
('+25498765433', False)
])
def test_is_valid_recipient(create_in_db_ussd_session,
create_valid_tx_recipient,
create_valid_tx_sender,
expected_result,
test_value):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (test_value, serialized_in_db_ussd_session, create_valid_tx_sender)
result = is_valid_recipient(state_machine_data=state_machine_data)
assert result == expected_result
def test_process_transaction_request(create_valid_tx_recipient,
create_valid_tx_sender,
load_config,
mock_outgoing_transactions,
ussd_session_data):
UssdStateMachine.chain_str = load_config.get('CIC_CHAIN_SPEC')
ussd_session_data['session_data'] = {
'recipient_phone_number': create_valid_tx_recipient.phone_number,
'transaction_amount': '50'
}
state_machine_data = ('', ussd_session_data, create_valid_tx_sender)
process_transaction_request(state_machine_data=state_machine_data)
assert mock_outgoing_transactions[0].get('amount') == 50.0
assert mock_outgoing_transactions[0].get('token_symbol') == 'SRF'

View File

@@ -0,0 +1,57 @@
# standard imports
import json
# third-party-imports
import pytest
# local imports
from cic_ussd.redis import InMemoryStore
from cic_ussd.state_machine.logic.user import (
change_preferred_language_to_en,
change_preferred_language_to_sw,
save_profile_attribute_to_session_data,
update_account_status_to_active)
def test_change_preferred_language(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
assert create_pending_user.preferred_language is None
change_preferred_language_to_en(state_machine_data)
assert create_pending_user.preferred_language == 'en'
change_preferred_language_to_sw(state_machine_data)
assert create_pending_user.preferred_language == 'sw'
def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
update_account_status_to_active(state_machine_data)
assert create_pending_user.get_account_status() == 'ACTIVE'
@pytest.mark.parametrize("current_state, expected_key, expected_result, user_input", [
("enter_first_name", "first_name", "John", "John"),
("enter_last_name", "last_name", "Doe", "Doe"),
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_business_profile", "business_profile", "Mandazi", "Mandazi")
])
def test_save_profile_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')[expected_key] == expected_result

View File

@@ -0,0 +1,67 @@
# standard imports
# third-party imports
import pytest
# local imports
from cic_ussd.state_machine.logic.validator import (is_valid_name,
has_complete_profile_data,
has_empty_username_data,
has_empty_gender_data,
has_empty_location_data,
has_empty_business_profile_data)
@pytest.mark.parametrize("user_input, expected_result", [
("Arya", True),
("1234", False)
])
def test_is_valid_name(create_in_db_ussd_session, create_pending_user, user_input, expected_result):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
result = is_valid_name(state_machine_data=state_machine_data)
assert result is expected_result
def test_has_complete_profile_data(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
has_complete_profile_data(state_machine_data=state_machine_data)
assert 'This section requires implementation of user metadata.' in caplog.text
def test_has_empty_username_data(caplog,
create_in_db_ussd_session,
create_activated_user):
state_machine_data = ('', create_in_db_ussd_session, create_activated_user)
has_empty_username_data(state_machine_data=state_machine_data)
assert 'This section requires implementation of user metadata.' in caplog.text
def test_has_empty_gender_data(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
has_empty_gender_data(state_machine_data=state_machine_data)
assert 'This section requires implementation of user metadata.' in caplog.text
def test_has_empty_location_data(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
has_empty_location_data(state_machine_data=state_machine_data)
assert 'This section requires implementation of user metadata.' in caplog.text
def test_has_empty_business_profile_data(caplog,
create_in_db_ussd_session,
create_activated_user):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
has_empty_business_profile_data(state_machine_data=state_machine_data)
assert 'This section requires implementation of user metadata.' in caplog.text