cic-internal-integration/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py

152 lines
5.8 KiB
Python

"""This module defines functions responsible for creation, validation, reset and any other manipulations on the
user's pin.
"""
# standard imports
import json
import logging
import re
from typing import Tuple
# third party imports
import bcrypt
# local imports
from cic_ussd.db.models.account import AccountStatus, Account
from cic_ussd.encoder import PasswordEncoder, create_password_hash
from cic_ussd.operations import persist_session_to_db_task, create_or_update_session
from cic_ussd.redis import InMemoryStore
logg = logging.getLogger(__file__)
def is_valid_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks a pin's validity by ensuring it has a length of for characters and the characters are
numeric.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A pin's validity
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
pin_is_valid = False
matcher = r'^\d{4}$'
if re.match(matcher, user_input):
pin_is_valid = True
return pin_is_valid
def is_authorized_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
pin_validity = user.verify_password(password=user_input)
if pin_validity is True:
return user.verify_password(password=user_input)
else:
# bump number for failed attempts
user.failed_pin_attempts += 1
Account.session.add(user)
Account.session.commit()
return pin_validity
def is_locked_account(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether a user's account is locked due to too many failed attempts.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
return user.get_account_status() == AccountStatus.LOCKED.name
def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account]):
"""This function hashes a pin and stores it in session data.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
# define redis cache entry point
cache = InMemoryStore.cache
# get external session id
external_session_id = ussd_session.get('external_session_id')
# get corresponding session record
in_redis_ussd_session = cache.get(external_session_id)
in_redis_ussd_session = json.loads(in_redis_ussd_session)
# set initial pin data
initial_pin = create_password_hash(user_input)
session_data = {
'initial_pin': initial_pin
}
# create new in memory ussd session with current ussd session data
create_or_update_session(
external_session_id=external_session_id,
phone=in_redis_ussd_session.get('msisdn'),
service_code=in_redis_ussd_session.get('service_code'),
user_input=user_input,
current_menu=in_redis_ussd_session.get('state'),
session_data=session_data
)
persist_session_to_db_task(external_session_id=external_session_id, queue='cic-ussd')
def pins_match(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
initial_pin = ussd_session.get('session_data').get('initial_pin')
fernet = PasswordEncoder(PasswordEncoder.key)
initial_pin = fernet.decrypt(initial_pin.encode())
return bcrypt.checkpw(user_input.encode(), initial_pin)
def complete_pin_change(state_machine_data: Tuple[str, dict, Account]):
"""This function persists the user's pin to the database
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
password_hash = ussd_session.get('session_data').get('initial_pin')
user.password_hash = password_hash
Account.session.add(user)
Account.session.commit()
def is_blocked_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
return user.get_account_status() == AccountStatus.LOCKED.name
def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user's new pin is a valid pin and that it isn't the same as the old one.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A match between two pin values.
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
is_old_pin = user.verify_password(password=user_input)
return is_valid_pin(state_machine_data=state_machine_data) and not is_old_pin