cic-internal-integration/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py

151 lines
6.0 KiB
Python
Raw Permalink Normal View History

2021-02-06 16:13:47 +01:00
"""This module defines functions responsible for creation, validation, reset and any other manipulations on the
user's pin.
"""
# standard imports
import json
import logging
import re
from typing import Tuple
# third party imports
2021-06-29 12:49:25 +02:00
from sqlalchemy.orm.session import Session
2021-02-06 16:13:47 +01:00
# local imports
2021-06-29 12:49:25 +02:00
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.enum import AccountStatus
from cic_ussd.encoder import create_password_hash, check_password_hash
2021-02-06 16:13:47 +01:00
from cic_ussd.operations import persist_session_to_db_task, create_or_update_session
from cic_ussd.redis import InMemoryStore
logg = logging.getLogger(__file__)
2021-06-29 12:49:25 +02:00
def is_valid_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks a pin's validity by ensuring it has a length of for characters and the characters are
numeric.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A pin's validity
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
pin_is_valid = False
matcher = r'^\d{4}$'
if re.match(matcher, user_input):
pin_is_valid = True
return pin_is_valid
2021-06-29 12:49:25 +02:00
def is_authorized_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
return user.verify_password(password=user_input)
2021-06-29 12:49:25 +02:00
def is_locked_account(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks whether a user's account is locked due to too many failed attempts.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
return user.get_account_status() == AccountStatus.LOCKED.name
2021-06-29 12:49:25 +02:00
def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]):
2021-02-06 16:13:47 +01:00
"""This function hashes a pin and stores it in session data.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
# define redis cache entry point
cache = InMemoryStore.cache
# get external session id
external_session_id = ussd_session.get('external_session_id')
# get corresponding session record
in_redis_ussd_session = cache.get(external_session_id)
in_redis_ussd_session = json.loads(in_redis_ussd_session)
# set initial pin data
initial_pin = create_password_hash(user_input)
2021-06-03 15:40:51 +02:00
if ussd_session.get('session_data'):
session_data = ussd_session.get('session_data')
session_data['initial_pin'] = initial_pin
else:
session_data = {
'initial_pin': initial_pin
}
2021-02-06 16:13:47 +01:00
# create new in memory ussd session with current ussd session data
create_or_update_session(
external_session_id=external_session_id,
phone=in_redis_ussd_session.get('msisdn'),
service_code=in_redis_ussd_session.get('service_code'),
user_input=user_input,
current_menu=in_redis_ussd_session.get('state'),
2021-06-29 12:49:25 +02:00
session=session,
2021-02-06 16:13:47 +01:00
session_data=session_data
)
persist_session_to_db_task(external_session_id=external_session_id, queue='cic-ussd')
2021-06-29 12:49:25 +02:00
def pins_match(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
initial_pin = ussd_session.get('session_data').get('initial_pin')
2021-06-03 15:40:51 +02:00
return check_password_hash(user_input, initial_pin)
2021-02-06 16:13:47 +01:00
2021-06-29 12:49:25 +02:00
def complete_pin_change(state_machine_data: Tuple[str, dict, Account, Session]):
2021-02-06 16:13:47 +01:00
"""This function persists the user's pin to the database
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
session = SessionBase.bind_session(session=session)
2021-02-06 16:13:47 +01:00
password_hash = ussd_session.get('session_data').get('initial_pin')
user.password_hash = password_hash
2021-06-29 12:49:25 +02:00
session.add(user)
session.flush()
SessionBase.release_session(session=session)
2021-02-06 16:13:47 +01:00
2021-06-29 12:49:25 +02:00
def is_blocked_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
return user.get_account_status() == AccountStatus.LOCKED.name
2021-06-29 12:49:25 +02:00
def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool:
2021-02-06 16:13:47 +01:00
"""This function checks whether the user's new pin is a valid pin and that it isn't the same as the old one.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
2021-06-29 12:49:25 +02:00
user_input, ussd_session, user, session = state_machine_data
2021-02-06 16:13:47 +01:00
is_old_pin = user.verify_password(password=user_input)
return is_valid_pin(state_machine_data=state_machine_data) and not is_old_pin