"""This module defines functions responsible for creation, validation, reset and any other manipulations on the user's pin. """ # standard imports import logging import re from typing import Tuple # third party imports from sqlalchemy.orm.session import Session # local imports from cic_ussd.db.models.account import Account from cic_ussd.db.models.base import SessionBase from cic_ussd.db.enum import AccountStatus from cic_ussd.encoder import create_password_hash, check_password_hash from cic_ussd.processor.util import wait_for_session_data from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session logg = logging.getLogger(__file__) def is_valid_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks a pin's validity by ensuring it has a length of for characters and the characters are numeric. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A pin's validity :rtype: bool """ user_input, ussd_session, account, session = state_machine_data matcher = r'^\d{4}$' return bool(re.match(matcher, user_input)) def is_authorized_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ user_input, ussd_session, account, session = state_machine_data is_verified_password = account.verify_password(password=user_input) if not is_verified_password: account.failed_pin_attempts += 1 return is_verified_password def is_locked_account(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether a user's account is locked due to too many failed attempts. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ user_input, ussd_session, account, session = state_machine_data return account.get_status(session) == AccountStatus.LOCKED.name def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]): """This function hashes a pin and stores it in session data. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ user_input, ussd_session, account, session = state_machine_data initial_pin = create_password_hash(user_input) if ussd_session.get('data'): data = ussd_session.get('data') data['initial_pin'] = initial_pin else: data = { 'initial_pin': initial_pin } external_session_id = ussd_session.get('external_session_id') create_or_update_session( external_session_id=external_session_id, msisdn=ussd_session.get('msisdn'), service_code=ussd_session.get('service_code'), user_input=user_input, state=ussd_session.get('state'), session=session, data=data ) persist_ussd_session(external_session_id, 'cic-ussd') def pins_match(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ user_input, ussd_session, account, session = state_machine_data wait_for_session_data('Initial pin', session_data_key='initial_pin', ussd_session=ussd_session) initial_pin = ussd_session.get('data').get('initial_pin') return check_password_hash(user_input, initial_pin) def complete_pin_change(state_machine_data: Tuple[str, dict, Account, Session]): """This function persists the user's pin to the database :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ user_input, ussd_session, account, session = state_machine_data session = SessionBase.bind_session(session=session) wait_for_session_data('Initial pin', session_data_key='initial_pin', ussd_session=ussd_session) password_hash = ussd_session.get('data').get('initial_pin') account.password_hash = password_hash session.add(account) session.flush() SessionBase.release_session(session=session) def is_blocked_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ user_input, ussd_session, account, session = state_machine_data return account.get_status(session) == AccountStatus.LOCKED.name def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user's new pin is a valid pin and that it isn't the same as the old one. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ user_input, ussd_session, account, session = state_machine_data is_old_pin = account.verify_password(password=user_input) return is_valid_pin(state_machine_data=state_machine_data) and not is_old_pin