From eb2f71aee0f48ea89db39f4c232388c20df7e7e8 Mon Sep 17 00:00:00 2001 From: PhilipWafula Date: Mon, 14 Jun 2021 10:38:23 +0300 Subject: [PATCH 1/6] Enable api level setting of queue value. --- apps/cic-notify/cic_notify/api.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/cic-notify/cic_notify/api.py b/apps/cic-notify/cic_notify/api.py index f879a6ee..fb27f7bd 100644 --- a/apps/cic-notify/cic_notify/api.py +++ b/apps/cic-notify/cic_notify/api.py @@ -26,7 +26,7 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): for q in qs[host]: if re.match(re_q, q['name']): host_queues.append((host, q['name'],)) - + task_prefix_len = len(task_prefix) queue_tasks = [] for (host, queue) in host_queues: @@ -35,17 +35,18 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): for task in tasks: if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix: queue_tasks.append((queue, task,)) - + return queue_tasks class Api: # TODO: Implement callback strategy - def __init__(self, queue='cic-notify'): + def __init__(self, queue=None): """ :param queue: The queue on which to execute notification tasks :type queue: str """ + self.queue = queue self.sms_tasks = get_sms_queue_tasks(app) logg.debug('sms tasks {}'.format(self.sms_tasks)) @@ -61,13 +62,19 @@ class Api: """ signatures = [] for q in self.sms_tasks: + + if not self.queue: + queue = q[0] + else: + queue = self.queue + signature = celery.signature( q[1], [ message, recipient, ], - queue=q[0], + queue=queue, ) signatures.append(signature) From 0bf2c35fcd5a3d56796ee37fa37b0053fc8b0ece Mon Sep 17 00:00:00 2001 From: Blair Vanderlugt Date: Sat, 26 Jun 2021 09:50:03 -0700 Subject: [PATCH 2/6] add key store and vars --- apps/data-seeding/cic_eth/import_balance.py | 1 + apps/data-seeding/cic_ussd/import_balance.py | 1 + apps/data-seeding/config/app.ini | 3 +++ apps/data-seeding/eth/import_balance.py | 1 + apps/data-seeding/eth/import_users.py | 1 + ...-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c | 1 + 6 files changed, 8 insertions(+) create mode 100644 apps/data-seeding/keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c diff --git a/apps/data-seeding/cic_eth/import_balance.py b/apps/data-seeding/cic_eth/import_balance.py index 97c75d60..904299a1 100644 --- a/apps/data-seeding/cic_eth/import_balance.py +++ b/apps/data-seeding/cic_eth/import_balance.py @@ -76,6 +76,7 @@ args_override = { 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'ETH_PROVIDER': getattr(args, 'p'), 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), + 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') diff --git a/apps/data-seeding/cic_ussd/import_balance.py b/apps/data-seeding/cic_ussd/import_balance.py index 5383bfd4..19315b11 100644 --- a/apps/data-seeding/cic_ussd/import_balance.py +++ b/apps/data-seeding/cic_ussd/import_balance.py @@ -70,6 +70,7 @@ args_override = { 'REDIS_DB': getattr(args, 'redis_db'), 'META_HOST': getattr(args, 'meta_host'), 'META_PORT': getattr(args, 'meta_port'), + 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') diff --git a/apps/data-seeding/config/app.ini b/apps/data-seeding/config/app.ini index 6d37c421..82cd4f1d 100644 --- a/apps/data-seeding/config/app.ini +++ b/apps/data-seeding/config/app.ini @@ -22,3 +22,6 @@ TRANSITIONS=/usr/src/cic-ussd/transitions/ host = port = ssl = + +[keystore] +file_path = keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c diff --git a/apps/data-seeding/eth/import_balance.py b/apps/data-seeding/eth/import_balance.py index b3fb1112..d391d021 100644 --- a/apps/data-seeding/eth/import_balance.py +++ b/apps/data-seeding/eth/import_balance.py @@ -75,6 +75,7 @@ args_override = { 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'ETH_PROVIDER': getattr(args, 'p'), 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), + 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') diff --git a/apps/data-seeding/eth/import_users.py b/apps/data-seeding/eth/import_users.py index 3a88b86d..8ef9fd84 100644 --- a/apps/data-seeding/eth/import_users.py +++ b/apps/data-seeding/eth/import_users.py @@ -59,6 +59,7 @@ config.process() args_override = { 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), 'CIC_CHAIN_SPEC': getattr(args, 'i'), + 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') } config.dict_override(args_override, 'cli') config.add(args.user_dir, '_USERDIR', True) diff --git a/apps/data-seeding/keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c b/apps/data-seeding/keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c new file mode 100644 index 00000000..2b843ec6 --- /dev/null +++ b/apps/data-seeding/keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c @@ -0,0 +1 @@ +{"address":"eb3907ecad74a0013c259d5874ae7f22dcbcc95c","crypto":{"cipher":"aes-128-ctr","ciphertext":"b0f70a8af4071faff2267374e2423cbc7a71012096fd2215866d8de7445cc215","cipherparams":{"iv":"9ac89383a7793226446dcb7e1b45cdf3"},"kdf":"scrypt","kdfparams":{"dklen":32,"n":262144,"p":1,"r":8,"salt":"299f7b5df1d08a0a7b7f9c9eb44fe4798683b78da3513fcf9603fd913ab3336f"},"mac":"6f4ed36c11345a9a48353cd2f93f1f92958c96df15f3112a192bc994250e8d03"},"id":"61a9dd88-24a9-495c-9a51-152bd1bfaa5b","version":3} \ No newline at end of file From e59a71188c26ebc0a384a7af3010459cb9306442 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 28 Jun 2021 17:18:52 +0200 Subject: [PATCH 3/6] Downgrade chainsyncer --- apps/cic-eth/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index b601639a..631f384e 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -16,7 +16,7 @@ moolb~=0.1.1b2 eth-address-index~=0.1.1a11 chainlib~=0.0.3rc2 hexathon~=0.0.1a7 -chainsyncer[sql]~=0.0.2a5 +chainsyncer[sql]==0.0.2a5 chainqueue~=0.0.2b3 sarafu-faucet~=0.0.3a3 erc20-faucet~=0.2.1a5 From a252195bdc847a825684e7aec434cc2a71b5fbc5 Mon Sep 17 00:00:00 2001 From: Blair Vanderlugt Date: Mon, 28 Jun 2021 10:56:15 -0700 Subject: [PATCH 4/6] uses the new base image --- apps/data-seeding/docker/Dockerfile | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/apps/data-seeding/docker/Dockerfile b/apps/data-seeding/docker/Dockerfile index 797cc66f..3b37085a 100644 --- a/apps/data-seeding/docker/Dockerfile +++ b/apps/data-seeding/docker/Dockerfile @@ -1,19 +1,21 @@ # syntax = docker/dockerfile:1.2 -FROM python:3.8.6-slim-buster as compile-image - -WORKDIR /root - -RUN apt-get update -RUN apt-get install -y --no-install-recommends git gcc g++ libpq-dev gawk jq telnet wget openssl iputils-ping gnupg socat bash procps make python2 cargo +#FROM python:3.8.6-slim-buster as compile-image +FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-5ab8bf45 RUN mkdir -vp /usr/local/etc/cic +COPY data-seeding/package.json \ + data-seeding/package-lock.json \ + . + +RUN npm install + COPY data-seeding/requirements.txt . +ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433" +ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple" +RUN pip install --extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL -r requirements.txt + COPY data-seeding/ . -ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433" -RUN pip install --extra-index-url $EXTRA_INDEX_URL -r requirements.txt - - ENTRYPOINT [ ] From 29e91fafab0395498a0ad89404b22571e264d89e Mon Sep 17 00:00:00 2001 From: Blair Vanderlugt Date: Mon, 28 Jun 2021 22:40:54 +0000 Subject: [PATCH 5/6] integration updates: meta imports --- apps/data-seeding/cic_meta/import_meta.js | 19 ++++++++++++---- apps/data-seeding/cic_ussd/import_balance.py | 2 +- apps/data-seeding/cic_ussd/import_users.py | 23 +++++++++++++++----- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/apps/data-seeding/cic_meta/import_meta.js b/apps/data-seeding/cic_meta/import_meta.js index 2884fa95..7bdda53a 100644 --- a/apps/data-seeding/cic_meta/import_meta.js +++ b/apps/data-seeding/cic_meta/import_meta.js @@ -31,13 +31,16 @@ function sendit(uid, envelope) { const req = http.request(url + uid, opts, (res) => { res.on('data', process.stdout.write); res.on('end', () => { + if (!res.complete) { + console.log('The connection was terminated while the message was being sent.') + } console.log('result', res.statusCode, res.headers); }); }); - if (!req.write(d)) { - console.error('foo', d); - process.exit(1); - } + req.on('error', (err) => { + console.log('ERROR when talking to meta', err) + }) + req.write(d) req.end(); } @@ -55,6 +58,7 @@ function doOne(keystore, filePath) { const s = new crdt.Syncable(uid, o); s.setSigner(signer); s.onwrap = (env) => { + console.log(`Sending uid: ${uid} and env: ${env} to meta`) sendit(uid, env); }; s.sign(); @@ -84,6 +88,7 @@ let batchCount = 0; function importMeta(keystore) { + console.log('Running importMeta....') let err; let files; @@ -94,6 +99,11 @@ function importMeta(keystore) { setTimeout(importMeta, batchDelay, keystore); return; } + console.log(`Trying to read ${files.length} files`) + if (files === 0) { + console.log(`ERROR did not find any files under ${workDir}. \nLooks like there is no work for me, bailing!`) + process.exit(1) + } let limit = batchSize; if (files.length < limit) { limit = files.length; @@ -108,6 +118,7 @@ function importMeta(keystore) { doOne(keystore, filePath); count++; batchCount++; + //console.log('done one', count, batchCount) if (batchCount == batchSize) { console.debug('reached batch size, breathing'); batchCount=0; diff --git a/apps/data-seeding/cic_ussd/import_balance.py b/apps/data-seeding/cic_ussd/import_balance.py index 19315b11..56435608 100644 --- a/apps/data-seeding/cic_ussd/import_balance.py +++ b/apps/data-seeding/cic_ussd/import_balance.py @@ -70,7 +70,7 @@ args_override = { 'REDIS_DB': getattr(args, 'redis_db'), 'META_HOST': getattr(args, 'meta_host'), 'META_PORT': getattr(args, 'meta_port'), - 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') + 'KEYSTORE_FILE_PATH': getattr(args, 'y') } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') diff --git a/apps/data-seeding/cic_ussd/import_users.py b/apps/data-seeding/cic_ussd/import_users.py index 07ef4dd2..e44b92fb 100644 --- a/apps/data-seeding/cic_ussd/import_users.py +++ b/apps/data-seeding/cic_ussd/import_users.py @@ -31,6 +31,9 @@ argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db t argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') +argparser.add_argument('--ussd-host', dest='ussd_host', type=str, help="host to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-port', dest='ussd_port', type=str, help="port to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') @@ -84,13 +87,21 @@ chain_str = str(chain_spec) batch_size = args.batch_size batch_delay = args.batch_delay - +ussd_port = args.ussd_port +ussd_host = args.ussd_host +ussd_no_ssl = args.ussd_no_ssl +if ussd_no_ssl is True: + ussd_ssl = False +else: + ussd_ssl = True def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): url = 'http' if ssl: url += 's' - url += '://{}:{}'.format(host, port) + url += '://{}'.format(host) + if port: + url += ':{}'.format(port) url += '/?username={}&password={}'.format(username, password) logg.info('ussd service url {}'.format(url)) @@ -119,11 +130,13 @@ def register_ussd(i, u): logg.debug('tel {} {}'.format(u.tel, phone)) req = build_ussd_request( phone, - 'localhost', - config.get('CIC_USER_USSD_SVC_SERVICE_PORT'), + ussd_host, + ussd_port, config.get('APP_SERVICE_CODE'), '', - '') + '', + ussd_ssl + ) response = urllib.request.urlopen(req) response_data = response.read().decode('utf-8') state = response_data[:3] From b1d5d45eefacb7c37308d2f39568f58a7792239f Mon Sep 17 00:00:00 2001 From: Philip Wafula Date: Tue, 29 Jun 2021 10:49:25 +0000 Subject: [PATCH 6/6] Philip/db session management --- apps/cic-ussd/cic_ussd/db/enum.py | 9 + apps/cic-ussd/cic_ussd/db/models/account.py | 29 +-- apps/cic-ussd/cic_ussd/operations.py | 110 ++++++++---- apps/cic-ussd/cic_ussd/phone_number.py | 14 -- apps/cic-ussd/cic_ussd/processor.py | 168 +++++++++++------- apps/cic-ussd/cic_ussd/requests.py | 25 ++- .../runnable/daemons/cic_user_server.py | 16 +- .../runnable/daemons/cic_user_ussd_server.py | 10 +- .../cic_ussd/state_machine/logic/balance.py | 5 +- .../cic_ussd/state_machine/logic/menu.py | 16 +- .../cic_ussd/state_machine/logic/pin.py | 48 ++--- .../cic_ussd/state_machine/logic/sms.py | 6 +- .../state_machine/logic/transaction.py | 65 ++++--- .../cic_ussd/state_machine/logic/user.py | 65 ++++--- .../cic_ussd/state_machine/logic/validator.py | 8 +- apps/cic-ussd/cic_ussd/validator.py | 29 ++- 16 files changed, 370 insertions(+), 253 deletions(-) create mode 100644 apps/cic-ussd/cic_ussd/db/enum.py diff --git a/apps/cic-ussd/cic_ussd/db/enum.py b/apps/cic-ussd/cic_ussd/db/enum.py new file mode 100644 index 00000000..cb9761d3 --- /dev/null +++ b/apps/cic-ussd/cic_ussd/db/enum.py @@ -0,0 +1,9 @@ +# standard import +from enum import IntEnum + + +class AccountStatus(IntEnum): + PENDING = 1 + ACTIVE = 2 + LOCKED = 3 + RESET = 4 diff --git a/apps/cic-ussd/cic_ussd/db/models/account.py b/apps/cic-ussd/cic_ussd/db/models/account.py index 18f5e370..1e073625 100644 --- a/apps/cic-ussd/cic_ussd/db/models/account.py +++ b/apps/cic-ussd/cic_ussd/db/models/account.py @@ -1,19 +1,13 @@ # standard imports -from enum import IntEnum - -# third party imports -from sqlalchemy import Column, Integer, String # local imports +from cic_ussd.db.enum import AccountStatus from cic_ussd.db.models.base import SessionBase from cic_ussd.encoder import check_password_hash, create_password_hash - -class AccountStatus(IntEnum): - PENDING = 1 - ACTIVE = 2 - LOCKED = 3 - RESET = 4 +# third party imports +from sqlalchemy import Column, Integer, String +from sqlalchemy.orm.session import Session class Account(SessionBase): @@ -30,6 +24,21 @@ class Account(SessionBase): account_status = Column(Integer) preferred_language = Column(String) + @staticmethod + def get_by_phone_number(phone_number: str, session: Session): + """Retrieves an account from a phone number. + :param phone_number: The E164 format of a phone number. + :type phone_number:str + :param session: + :type session: + :return: An account object. + :rtype: Account + """ + session = SessionBase.bind_session(session=session) + account = session.query(Account).filter_by(phone_number=phone_number).first() + SessionBase.release_session(session=session) + return account + def __init__(self, blockchain_address, phone_number): self.blockchain_address = blockchain_address self.phone_number = phone_number diff --git a/apps/cic-ussd/cic_ussd/operations.py b/apps/cic-ussd/cic_ussd/operations.py index 5f2ca7a2..bd0e7476 100644 --- a/apps/cic-ussd/cic_ussd/operations.py +++ b/apps/cic-ussd/cic_ussd/operations.py @@ -6,11 +6,13 @@ import logging import celery import i18n from cic_eth.api.api_task import Api +from sqlalchemy.orm.session import Session from tinydb.table import Document from typing import Optional # local imports from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase from cic_ussd.db.models.ussd_session import UssdSession from cic_ussd.db.models.task_tracker import TaskTracker from cic_ussd.menu.ussd_menu import UssdMenu @@ -22,15 +24,18 @@ from cic_ussd.validator import check_known_user, validate_response_type logg = logging.getLogger() -def add_tasks_to_tracker(task_uuid): - """ - This function takes tasks spawned over api interfaces and records their creation time for tracking. +def add_tasks_to_tracker(session, task_uuid: str): + """This function takes tasks spawned over api interfaces and records their creation time for tracking. + :param session: + :type session: :param task_uuid: The uuid for an initiated task. :type task_uuid: str """ + session = SessionBase.bind_session(session=session) task_record = TaskTracker(task_uuid=task_uuid) - TaskTracker.session.add(task_record) - TaskTracker.session.commit() + session.add(task_record) + session.flush() + SessionBase.release_session(session=session) def define_response_with_content(headers: list, response: str) -> tuple: @@ -95,6 +100,7 @@ def create_or_update_session( service_code: str, user_input: str, current_menu: str, + session, session_data: Optional[dict] = None) -> InMemoryUssdSession: """ Handles the creation or updating of session as necessary. @@ -108,12 +114,15 @@ def create_or_update_session( :type user_input: str :param current_menu: Menu name that is currently being displayed on the ussd session :type current_menu: str + :param session: + :type session: :param session_data: Any additional data that was persisted during the user's interaction with the system. :type session_data: dict. :return: ussd session object :rtype: InMemoryUssdSession """ - existing_ussd_session = UssdSession.session.query(UssdSession).filter_by( + session = SessionBase.bind_session(session=session) + existing_ussd_session = session.query(UssdSession).filter_by( external_session_id=external_session_id).first() if existing_ussd_session: @@ -132,20 +141,25 @@ def create_or_update_session( current_menu=current_menu, session_data=session_data ) + SessionBase.release_session(session=session) return ussd_session -def get_account_status(phone_number) -> str: +def get_account_status(phone_number, session: Session) -> str: """Get the status of a user's account. :param phone_number: The phone number to be checked. :type phone_number: str + :param session: + :type session: :return: The user account status. :rtype: str """ - user = Account.session.query(Account).filter_by(phone_number=phone_number).first() - status = user.get_account_status() - Account.session.add(user) - Account.session.commit() + session = SessionBase.bind_session(session=session) + account = Account.get_by_phone_number(phone_number=phone_number, session=session) + status = account.get_account_status() + session.add(account) + session.flush() + SessionBase.release_session(session=session) return status @@ -165,6 +179,7 @@ def initiate_account_creation_request(chain_str: str, external_session_id: str, phone_number: str, service_code: str, + session, user_input: str) -> str: """This function issues a task to create a blockchain account on cic-eth. It then creates a record of the ussd session corresponding to the creation of the account and returns a response denoting that the user's account is @@ -177,6 +192,8 @@ def initiate_account_creation_request(chain_str: str, :type phone_number: str :param service_code: The service code dialed. :type service_code: str + :param session: + :type session: :param user_input: The input entered by the user. :type user_input: str :return: A response denoting that the account is being created. @@ -190,7 +207,7 @@ def initiate_account_creation_request(chain_str: str, creation_task_id = cic_eth_api.create_account().id # record task initiation time - add_tasks_to_tracker(task_uuid=creation_task_id) + add_tasks_to_tracker(task_uuid=creation_task_id, session=session) # cache account creation data cache_account_creation_task_id(phone_number=phone_number, task_id=creation_task_id) @@ -204,6 +221,7 @@ def initiate_account_creation_request(chain_str: str, phone=phone_number, service_code=service_code, current_menu=current_menu.get('name'), + session=session, user_input=user_input) # define response to relay to user @@ -268,12 +286,14 @@ def cache_account_creation_task_id(phone_number: str, task_id: str): redis_cache.persist(name=task_id) -def process_current_menu(ussd_session: Optional[dict], user: Account, user_input: str) -> Document: +def process_current_menu(account: Account, session: Session, ussd_session: Optional[dict], user_input: str) -> Document: """This function checks user input and returns a corresponding ussd menu :param ussd_session: An in db ussd session object. :type ussd_session: UssdSession - :param user: A user object. - :type user: Account + :param account: A account object. + :type account: Account + :param session: + :type session: :param user_input: The user's input. :type user_input: str :return: An in memory ussd menu object. @@ -285,7 +305,13 @@ def process_current_menu(ussd_session: Optional[dict], user: Account, user_input else: # get current state latest_input = get_latest_input(user_input=user_input) - current_menu = process_request(ussd_session=ussd_session, user_input=latest_input, user=user) + session = SessionBase.bind_session(session=session) + current_menu = process_request( + account=account, + session=session, + ussd_session=ussd_session, + user_input=latest_input) + SessionBase.release_session(session=session) return current_menu @@ -294,6 +320,7 @@ def process_menu_interaction_requests(chain_str: str, phone_number: str, queue: str, service_code: str, + session, user_input: str) -> str: """This function handles requests intended for interaction with ussd menu, it checks whether a user matching the provided phone number exists and in the absence of which it creates an account for the user. @@ -308,25 +335,29 @@ def process_menu_interaction_requests(chain_str: str, :type queue: str :param service_code: The service dialed by the user making the request. :type service_code: str + :param session: + :type session: :param user_input: The inputs entered by the user. :type user_input: str :return: A response based on the request received. :rtype: str """ # check whether the user exists - if not check_known_user(phone=phone_number): + if not check_known_user(phone_number=phone_number, session=session): response = initiate_account_creation_request(chain_str=chain_str, external_session_id=external_session_id, phone_number=phone_number, service_code=service_code, + session=session, user_input=user_input) else: - # get user - user = Account.session.query(Account).filter_by(phone_number=phone_number).first() + # get account + session = SessionBase.bind_session(session=session) + account = Account.get_by_phone_number(phone_number=phone_number, session=session) # retrieve and cache user's metadata - blockchain_address = user.blockchain_address + blockchain_address = account.blockchain_address s_query_person_metadata = celery.signature( 'cic_ussd.tasks.metadata.query_person_metadata', [blockchain_address] @@ -334,24 +365,25 @@ def process_menu_interaction_requests(chain_str: str, s_query_person_metadata.apply_async(queue='cic-ussd') # find any existing ussd session - existing_ussd_session = UssdSession.session.query(UssdSession).filter_by( - external_session_id=external_session_id).first() + existing_ussd_session = session.query(UssdSession).filter_by(external_session_id=external_session_id).first() # validate user inputs if existing_ussd_session: current_menu = process_current_menu( + account=account, + session=session, ussd_session=existing_ussd_session.to_json(), - user=user, user_input=user_input ) else: current_menu = process_current_menu( + account=account, + session=session, ussd_session=None, - user=user, user_input=user_input ) - last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number) + last_ussd_session = retrieve_most_recent_ussd_session(phone_number=account.phone_number, session=session) if last_ussd_session: # create or update the ussd session as appropriate @@ -361,6 +393,7 @@ def process_menu_interaction_requests(chain_str: str, service_code=service_code, user_input=user_input, current_menu=current_menu.get('name'), + session=session, session_data=last_ussd_session.session_data ) else: @@ -369,15 +402,17 @@ def process_menu_interaction_requests(chain_str: str, phone=phone_number, service_code=service_code, user_input=user_input, - current_menu=current_menu.get('name') + current_menu=current_menu.get('name'), + session=session ) # define appropriate response response = custom_display_text( + account=account, display_key=current_menu.get('display_key'), menu_name=current_menu.get('name'), + session=session, ussd_session=ussd_session.to_json(), - user=user ) # check that the response from the processor is valid @@ -386,21 +421,26 @@ def process_menu_interaction_requests(chain_str: str, # persist session to db persist_session_to_db_task(external_session_id=external_session_id, queue=queue) + SessionBase.release_session(session=session) return response -def reset_pin(phone_number: str) -> str: +def reset_pin(phone_number: str, session: Session) -> str: """Reset account status from Locked to Pending. :param phone_number: The phone number belonging to the account to be unlocked. :type phone_number: str + :param session: + :type session: :return: The status of the pin reset. :rtype: str """ - user = Account.session.query(Account).filter_by(phone_number=phone_number).first() - user.reset_account_pin() - Account.session.add(user) - Account.session.commit() + session = SessionBase.bind_session(session=session) + account = Account.get_by_phone_number(phone_number=phone_number, session=session) + account.reset_account_pin() + session.add(account) + session.flush() + SessionBase.release_session(session=session) response = f'Pin reset for user {phone_number} is successful!' return response @@ -438,11 +478,13 @@ def update_ussd_session( return session -def save_to_in_memory_ussd_session_data(queue: str, session_data: dict, ussd_session: dict): +def save_to_in_memory_ussd_session_data(queue: str, session: Session, session_data: dict, ussd_session: dict): """This function is used to save information to the session data attribute of a ussd session object in the redis cache. :param queue: The queue on which the celery task should run. :type queue: str + :param session: + :type session: :param session_data: A dictionary containing data for a specific ussd session in redis that needs to be saved temporarily. :type session_data: dict @@ -473,7 +515,7 @@ def save_to_in_memory_ussd_session_data(queue: str, session_data: dict, ussd_ses service_code=in_redis_ussd_session.get('service_code'), user_input=in_redis_ussd_session.get('user_input'), current_menu=in_redis_ussd_session.get('state'), + session=session, session_data=session_data ) persist_session_to_db_task(external_session_id=external_session_id, queue=queue) - diff --git a/apps/cic-ussd/cic_ussd/phone_number.py b/apps/cic-ussd/cic_ussd/phone_number.py index 7556709f..1fc94f6f 100644 --- a/apps/cic-ussd/cic_ussd/phone_number.py +++ b/apps/cic-ussd/cic_ussd/phone_number.py @@ -33,19 +33,5 @@ def process_phone_number(phone_number: str, region: str): return parsed_phone_number - -def get_user_by_phone_number(phone_number: str) -> Optional[Account]: - """This function queries the database for a user based on the provided phone number. - :param phone_number: A valid phone number. - :type phone_number: str - :return: A user object matching a given phone number - :rtype: Account|None - """ - # consider adding region to user's metadata - phone_number = process_phone_number(phone_number=phone_number, region=E164Format.region) - user = Account.session.query(Account).filter_by(phone_number=phone_number).first() - return user - - class Support: phone_number = None diff --git a/apps/cic-ussd/cic_ussd/processor.py b/apps/cic-ussd/cic_ussd/processor.py index adf81f12..a7d515a3 100644 --- a/apps/cic-ussd/cic_ussd/processor.py +++ b/apps/cic-ussd/cic_ussd/processor.py @@ -2,25 +2,26 @@ import datetime import logging import json -import re from typing import Optional # third party imports -import celery from sqlalchemy import desc from cic_eth.api import Api +from sqlalchemy.orm.session import Session from tinydb.table import Document # local imports from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance from cic_ussd.chain import Chain -from cic_ussd.db.models.account import AccountStatus, Account +from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase from cic_ussd.db.models.ussd_session import UssdSession -from cic_ussd.error import MetadataNotFoundError, SeppukuError +from cic_ussd.db.enum import AccountStatus +from cic_ussd.error import SeppukuError from cic_ussd.menu.ussd_menu import UssdMenu from cic_ussd.metadata import blockchain_address_to_metadata_pointer -from cic_ussd.phone_number import get_user_by_phone_number, Support +from cic_ussd.phone_number import Support from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data from cic_ussd.state_machine import UssdStateMachine from cic_ussd.conversions import to_wei, from_wei @@ -62,47 +63,48 @@ def retrieve_token_symbol(chain_str: str = Chain.spec.__str__()): raise SeppukuError(f'Could not retrieve default token for: {chain_str}') -def process_pin_authorization(display_key: str, user: Account, **kwargs) -> str: - """ - This method provides translation for all ussd menu entries that follow the pin authorization pattern. +def process_pin_authorization(account: Account, display_key: str, **kwargs) -> str: + """This method provides translation for all ussd menu entries that follow the pin authorization pattern. + :param account: The account in a running USSD session. + :type account: Account :param display_key: The path in the translation files defining an appropriate ussd response :type display_key: str - :param user: The user in a running USSD session. - :type user: Account :param kwargs: Any additional information required by the text values in the internationalization files. :type kwargs :return: A string value corresponding the ussd menu's text value. :rtype: str """ remaining_attempts = 3 - if user.failed_pin_attempts > 0: + if account.failed_pin_attempts > 0: return translation_for( key=f'{display_key}.retry', - preferred_language=user.preferred_language, - remaining_attempts=(remaining_attempts - user.failed_pin_attempts) + preferred_language=account.preferred_language, + remaining_attempts=(remaining_attempts - account.failed_pin_attempts) ) else: return translation_for( key=f'{display_key}.first', - preferred_language=user.preferred_language, + preferred_language=account.preferred_language, **kwargs ) -def process_exit_insufficient_balance(display_key: str, user: Account, ussd_session: dict): +def process_exit_insufficient_balance(account: Account, display_key: str, session: Session, ussd_session: dict): """This function processes the exit menu letting users their account balance is insufficient to perform a specific transaction. + :param account: The account requesting access to the ussd menu. + :type account: Account :param display_key: The path in the translation files defining an appropriate ussd response :type display_key: str - :param user: The user requesting access to the ussd menu. - :type user: Account + :param session: + :type session: :param ussd_session: A JSON serialized in-memory ussd session object :type ussd_session: dict :return: Corresponding translation text response :rtype: str """ # get account balance - operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address) + operational_balance = get_cached_operational_balance(blockchain_address=account.blockchain_address) # compile response data user_input = ussd_session.get('user_input').split('*')[-1] @@ -112,13 +114,13 @@ def process_exit_insufficient_balance(display_key: str, user: Account, ussd_sess token_symbol = retrieve_token_symbol() recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') - recipient = get_user_by_phone_number(phone_number=recipient_phone_number) + recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session) tx_recipient_information = define_account_tx_metadata(user=recipient) return translation_for( key=display_key, - preferred_language=user.preferred_language, + preferred_language=account.preferred_language, amount=from_wei(transaction_amount), token_symbol=token_symbol, recipient_information=tx_recipient_information, @@ -126,12 +128,14 @@ def process_exit_insufficient_balance(display_key: str, user: Account, ussd_sess ) -def process_exit_successful_transaction(display_key: str, user: Account, ussd_session: dict): +def process_exit_successful_transaction(account: Account, display_key: str, session: Session, ussd_session: dict): """This function processes the exit menu after a successful initiation for a transfer of tokens. + :param account: The account requesting access to the ussd menu. + :type account: Account :param display_key: The path in the translation files defining an appropriate ussd response :type display_key: str - :param user: The user requesting access to the ussd menu. - :type user: Account + :param session: + :type session: :param ussd_session: A JSON serialized in-memory ussd session object :type ussd_session: dict :return: Corresponding translation text response @@ -140,13 +144,13 @@ def process_exit_successful_transaction(display_key: str, user: Account, ussd_se transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount'))) token_symbol = retrieve_token_symbol() recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') - recipient = get_user_by_phone_number(phone_number=recipient_phone_number) + recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session) tx_recipient_information = define_account_tx_metadata(user=recipient) - tx_sender_information = define_account_tx_metadata(user=user) + tx_sender_information = define_account_tx_metadata(user=account) return translation_for( key=display_key, - preferred_language=user.preferred_language, + preferred_language=account.preferred_language, transaction_amount=from_wei(transaction_amount), token_symbol=token_symbol, recipient_information=tx_recipient_information, @@ -154,13 +158,15 @@ def process_exit_successful_transaction(display_key: str, user: Account, ussd_se ) -def process_transaction_pin_authorization(user: Account, display_key: str, ussd_session: dict): +def process_transaction_pin_authorization(account: Account, display_key: str, session: Session, ussd_session: dict): """This function processes pin authorization where making a transaction is concerned. It constructs a pre-transaction response menu that shows the details of the transaction. - :param user: The user requesting access to the ussd menu. - :type user: Account + :param account: The account requesting access to the ussd menu. + :type account: Account :param display_key: The path in the translation files defining an appropriate ussd response :type display_key: str + :param session: + :type session: :param ussd_session: The USSD session determining what user data needs to be extracted and added to the menu's text values. :type ussd_session: UssdSession @@ -169,16 +175,16 @@ def process_transaction_pin_authorization(user: Account, display_key: str, ussd_ """ # compile response data recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') - recipient = get_user_by_phone_number(phone_number=recipient_phone_number) + recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session) tx_recipient_information = define_account_tx_metadata(user=recipient) - tx_sender_information = define_account_tx_metadata(user=user) + tx_sender_information = define_account_tx_metadata(user=account) token_symbol = retrieve_token_symbol() user_input = ussd_session.get('session_data').get('transaction_amount') transaction_amount = to_wei(value=int(user_input)) logg.debug('Requires integration to determine user tokens.') return process_pin_authorization( - user=user, + account=account, display_key=display_key, recipient_information=tx_recipient_information, transaction_amount=from_wei(transaction_amount), @@ -187,14 +193,12 @@ def process_transaction_pin_authorization(user: Account, display_key: str, ussd_ ) -def process_account_balances(user: Account, display_key: str, ussd_session: dict): +def process_account_balances(user: Account, display_key: str): """ :param user: :type user: :param display_key: :type display_key: - :param ussd_session: - :type ussd_session: :return: :rtype: """ @@ -295,14 +299,12 @@ def process_display_user_metadata(user: Account, display_key: str): ) -def process_account_statement(user: Account, display_key: str, ussd_session: dict): +def process_account_statement(user: Account, display_key: str): """ :param user: :type user: :param display_key: :type display_key: - :param ussd_session: - :type ussd_session: :return: :rtype: """ @@ -404,23 +406,26 @@ def process_start_menu(display_key: str, user: Account): ) -def retrieve_most_recent_ussd_session(phone_number: str) -> UssdSession: +def retrieve_most_recent_ussd_session(phone_number: str, session: Session) -> UssdSession: # get last ussd session based on user phone number - last_ussd_session = UssdSession.session\ - .query(UssdSession)\ + session = SessionBase.bind_session(session=session) + last_ussd_session = session.query(UssdSession)\ .filter_by(msisdn=phone_number)\ .order_by(desc(UssdSession.created))\ .first() + SessionBase.release_session(session=session) return last_ussd_session -def process_request(user_input: str, user: Account, ussd_session: Optional[dict] = None) -> Document: +def process_request(account: Account, session, user_input: str, ussd_session: Optional[dict] = None) -> Document: """This function assesses a request based on the user from the request comes, the session_id and the user's input. It determines whether the request translates to a return to an existing session by checking whether the provided session id exists in the database or whether the creation of a new ussd session object is warranted. It then returns the appropriate ussd menu text values. - :param user: The user requesting access to the ussd menu. - :type user: Account + :param account: The account requesting access to the ussd menu. + :type account: Account + :param session: + :type session: :param user_input: The value a user enters in the ussd menu. :type user_input: str :param ussd_session: A JSON serialized in-memory ussd session object @@ -433,11 +438,15 @@ def process_request(user_input: str, user: Account, ussd_session: Optional[dict] if user_input == "0": return UssdMenu.parent_menu(menu_name=ussd_session.get('state')) else: - successive_state = next_state(ussd_session=ussd_session, user=user, user_input=user_input) + successive_state = next_state( + account=account, + session=session, + ussd_session=ussd_session, + user_input=user_input) return UssdMenu.find_by_name(name=successive_state) else: - if user.has_valid_pin(): - last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number) + if account.has_valid_pin(): + last_ussd_session = retrieve_most_recent_ussd_session(phone_number=account.phone_number, session=session) if last_ussd_session: # get last state @@ -456,28 +465,30 @@ def process_request(user_input: str, user: Account, ussd_session: Optional[dict] else: return UssdMenu.find_by_name(name=last_state) else: - if user.failed_pin_attempts >= 3 and user.get_account_status() == AccountStatus.LOCKED.name: + if account.failed_pin_attempts >= 3 and account.get_account_status() == AccountStatus.LOCKED.name: return UssdMenu.find_by_name(name='exit_pin_blocked') - elif user.preferred_language is None: + elif account.preferred_language is None: return UssdMenu.find_by_name(name='initial_language_selection') else: return UssdMenu.find_by_name(name='initial_pin_entry') -def next_state(ussd_session: dict, user: Account, user_input: str) -> str: +def next_state(account: Account, session, ussd_session: dict, user_input: str) -> str: """This function navigates the state machine based on the ussd session object and user inputs it receives. It checks the user input and provides the successive state in the state machine. It then updates the session's state attribute with the new state. + :param account: The account requesting access to the ussd menu. + :type account: Account + :param session: + :type session: :param ussd_session: A JSON serialized in-memory ussd session object :type ussd_session: dict - :param user: The user requesting access to the ussd menu. - :type user: Account :param user_input: The value a user enters in the ussd menu. :type user_input: str :return: A string value corresponding the successive give a specific state in the state machine. """ state_machine = UssdStateMachine(ussd_session=ussd_session) - state_machine.scan_data((user_input, ussd_session, user)) + state_machine.scan_data((user_input, ussd_session, account, session)) new_state = state_machine.state return new_state @@ -492,42 +503,63 @@ def process_exit_invalid_menu_option(display_key: str, preferred_language: str): def custom_display_text( + account: Account, display_key: str, menu_name: str, - ussd_session: dict, - user: Account) -> str: + session: Session, + ussd_session: dict) -> str: """This function extracts the appropriate session data based on the current menu name. It then inserts them as keywords in the i18n function. + :param account: The account in a running USSD session. + :type account: Account :param display_key: The path in the translation files defining an appropriate ussd response :type display_key: str :param menu_name: The name by which a specific menu can be identified. :type menu_name: str - :param user: The user in a running USSD session. - :type user: Account + :param session: + :type session: :param ussd_session: A JSON serialized in-memory ussd session object :type ussd_session: dict :return: A string value corresponding the ussd menu's text value. :rtype: str """ if menu_name == 'transaction_pin_authorization': - return process_transaction_pin_authorization(display_key=display_key, user=user, ussd_session=ussd_session) + return process_transaction_pin_authorization( + account=account, + display_key=display_key, + session=session, + ussd_session=ussd_session) elif menu_name == 'exit_insufficient_balance': - return process_exit_insufficient_balance(display_key=display_key, user=user, ussd_session=ussd_session) + return process_exit_insufficient_balance( + account=account, + display_key=display_key, + session=session, + ussd_session=ussd_session) elif menu_name == 'exit_successful_transaction': - return process_exit_successful_transaction(display_key=display_key, user=user, ussd_session=ussd_session) + return process_exit_successful_transaction( + account=account, + display_key=display_key, + session=session, + ussd_session=ussd_session) elif menu_name == 'start': - return process_start_menu(display_key=display_key, user=user) + return process_start_menu(display_key=display_key, user=account) elif 'pin_authorization' in menu_name: - return process_pin_authorization(display_key=display_key, user=user) + return process_pin_authorization( + account=account, + display_key=display_key, + session=session) elif 'enter_current_pin' in menu_name: - return process_pin_authorization(display_key=display_key, user=user) + return process_pin_authorization( + account=account, + display_key=display_key, + session=session) elif menu_name == 'account_balances': - return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session) + return process_account_balances(display_key=display_key, user=account) elif 'transaction_set' in menu_name: - return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session) + return process_account_statement(display_key=display_key, user=account) elif menu_name == 'display_user_metadata': - return process_display_user_metadata(display_key=display_key, user=user) + return process_display_user_metadata(display_key=display_key, user=account) elif menu_name == 'exit_invalid_menu_option': - return process_exit_invalid_menu_option(display_key=display_key, preferred_language=user.preferred_language) + return process_exit_invalid_menu_option(display_key=display_key, preferred_language=account.preferred_language) else: - return translation_for(key=display_key, preferred_language=user.preferred_language) + return translation_for(key=display_key, preferred_language=account.preferred_language) diff --git a/apps/cic-ussd/cic_ussd/requests.py b/apps/cic-ussd/cic_ussd/requests.py index 8d436f45..897d8539 100644 --- a/apps/cic-ussd/cic_ussd/requests.py +++ b/apps/cic-ussd/cic_ussd/requests.py @@ -8,9 +8,12 @@ from urllib.parse import urlparse, parse_qs # third-party imports from sqlalchemy import desc +from sqlalchemy.orm.session import Session # local imports -from cic_ussd.db.models.account import AccountStatus, Account +from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase +from cic_ussd.db.enum import AccountStatus from cic_ussd.operations import get_account_status, reset_pin from cic_ussd.validator import check_known_user @@ -72,24 +75,26 @@ def get_account_creation_callback_request_data(env: dict) -> tuple: return status, task_id, result -def process_pin_reset_requests(env: dict, phone_number: str): +def process_pin_reset_requests(env: dict, phone_number: str, session: Session): """This function processes requests that are responsible for the pin reset functionality. It processes GET and PUT requests responsible for returning an account's status and :param env: A dictionary of values representing data sent on the api. :type env: dict :param phone_number: The phone of the user whose pin is being reset. :type phone_number: str + :param session: + :type session: :return: A response denoting the result of the request to reset the user's pin. :rtype: str """ - if not check_known_user(phone=phone_number): + if not check_known_user(phone_number=phone_number, session=session): return f'No user matching {phone_number} was found.', '404 Not Found' if get_request_method(env) == 'PUT': - return reset_pin(phone_number=phone_number), '200 OK' + return reset_pin(phone_number=phone_number, session=session), '200 OK' if get_request_method(env) == 'GET': - status = get_account_status(phone_number=phone_number) + status = get_account_status(phone_number=phone_number, session=session) response = { 'status': f'{status}' } @@ -97,16 +102,18 @@ def process_pin_reset_requests(env: dict, phone_number: str): return response, '200 OK' -def process_locked_accounts_requests(env: dict) -> tuple: +def process_locked_accounts_requests(env: dict, session: Session) -> tuple: """This function authenticates staff requests and returns a serialized JSON formatted list of blockchain addresses of accounts for which the PIN has been locked due to too many failed attempts. :param env: A dictionary of values representing data sent on the api. :type env: dict + :param session: + :type session: :return: A tuple containing a serialized list of blockchain addresses for locked accounts and corresponding message for the response. :rtype: tuple """ - logg.debug('Authentication requires integration with cic-auth') + session = SessionBase.bind_session(session=session) response = '' if get_request_method(env) == 'GET': @@ -123,12 +130,14 @@ def process_locked_accounts_requests(env: dict) -> tuple: else: limit = r[1] - locked_accounts = Account.session.query(Account.blockchain_address).filter( + locked_accounts = session.query(Account.blockchain_address).filter( Account.account_status == AccountStatus.LOCKED.value, Account.failed_pin_attempts >= 3).order_by(desc(Account.updated)).offset(offset).limit(limit).all() # convert lists to scalar blockchain addresses locked_accounts = [blockchain_address for (blockchain_address, ) in locked_accounts] + + SessionBase.release_session(session=session) response = json.dumps(locked_accounts) return response, '200 OK' return response, '405 Play by the rules' diff --git a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_server.py b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_server.py index 53f633f6..c8df1ccd 100644 --- a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_server.py +++ b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_server.py @@ -36,11 +36,8 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config)) # set up db data_source_name = dsn_from_config(config) SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) -# create session for the life time of http request -SessionBase.session = SessionBase.create_session() -# handle requests from CICADA def application(env, start_response): """Loads python code for application to be accessible over web server :param env: Object containing server and request information @@ -55,19 +52,24 @@ def application(env, start_response): errors_headers = [('Content-Type', 'text/plain'), ('Content-Length', '0')] headers = [('Content-Type', 'text/plain')] + # create session for the life time of http request + session = SessionBase.create_session() + if get_request_endpoint(env) == '/pin': phone_number = get_query_parameters(env=env, query_name='phoneNumber') phone_number = quote_plus(phone_number) - response, message = process_pin_reset_requests(env=env, phone_number=phone_number) + response, message = process_pin_reset_requests(env=env, phone_number=phone_number, session=session) response_bytes, headers = define_response_with_content(headers=errors_headers, response=response) - SessionBase.session.close() + session.commit() + session.close() start_response(message, headers) return [response_bytes] # handle requests for locked accounts - response, message = process_locked_accounts_requests(env=env) + response, message = process_locked_accounts_requests(env=env, session=session) response_bytes, headers = define_response_with_content(headers=headers, response=response) start_response(message, headers) - SessionBase.session.close() + session.commit() + session.close() return [response_bytes] diff --git a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_ussd_server.py b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_ussd_server.py index 738127e8..984fb680 100644 --- a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_ussd_server.py +++ b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_ussd_server.py @@ -55,8 +55,6 @@ data_source_name = dsn_from_config(config) SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) -# create session for the life time of http request -SessionBase.session = SessionBase.create_session() # set up translations i18n.load_path.append(config.get('APP_LOCALE_PATH')) @@ -143,6 +141,9 @@ def application(env, start_response): errors_headers = [('Content-Type', 'text/plain'), ('Content-Length', '0')] headers = [('Content-Type', 'text/plain')] + # create session for the life time of http request + session = SessionBase.create_session() + if get_request_method(env=env) == 'POST' and get_request_endpoint(env=env) == '/': if env.get('CONTENT_TYPE') != 'application/x-www-form-urlencoded': @@ -206,17 +207,20 @@ def application(env, start_response): phone_number=phone_number, queue=args.q, service_code=service_code, + session=session, user_input=user_input) response_bytes, headers = define_response_with_content(headers=headers, response=response) start_response('200 OK,', headers) - SessionBase.session.close() + session.commit() + session.close() return [response_bytes] else: logg.error('invalid query {}'.format(env)) for r in env: logg.debug('{}: {}'.format(r, env)) + session.close() start_response('405 Play by the rules', errors_headers) return [] diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/balance.py b/apps/cic-ussd/cic_ussd/state_machine/logic/balance.py index 899ff346..64f1dd61 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/balance.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/balance.py @@ -3,6 +3,7 @@ import logging from typing import Tuple # third-party imports +from sqlalchemy.orm.session import Session # local imports from cic_ussd.db.models.account import Account @@ -10,11 +11,11 @@ from cic_ussd.db.models.account import Account logg = logging.getLogger(__file__) -def process_mini_statement_request(state_machine_data: Tuple[str, dict, Account]): +def process_mini_statement_request(state_machine_data: Tuple[str, dict, Account, Session]): """This function compiles a brief statement of a user's last three inbound and outbound transactions and send the same as a message on their selected avenue for notification. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data logg.debug('This section requires integration with cic-eth. (The last 6 transactions would be sent as an sms.)') diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/menu.py b/apps/cic-ussd/cic_ussd/state_machine/logic/menu.py index a1889ce5..bac773b9 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/menu.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/menu.py @@ -16,7 +16,7 @@ def menu_one_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '1' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '1' @@ -27,7 +27,7 @@ def menu_two_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '2' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '2' @@ -38,7 +38,7 @@ def menu_three_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '3' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '3' @@ -50,7 +50,7 @@ def menu_four_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '4' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '4' @@ -62,7 +62,7 @@ def menu_five_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '5' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '5' @@ -74,7 +74,7 @@ def menu_six_selected(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user input's match with '6' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '6' @@ -86,7 +86,7 @@ def menu_zero_zero_selected(state_machine_data: Tuple[str, dict, Account]) -> bo :return: A user input's match with '00' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '00' @@ -98,5 +98,5 @@ def menu_ninety_nine_selected(state_machine_data: Tuple[str, dict, Account]) -> :return: A user input's match with '99' :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user_input == '99' diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py b/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py index 5f51e602..9419d248 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/pin.py @@ -9,11 +9,13 @@ import re from typing import Tuple # third party imports -import bcrypt +from sqlalchemy.orm.session import Session # local imports -from cic_ussd.db.models.account import AccountStatus, Account -from cic_ussd.encoder import PasswordEncoder, create_password_hash, check_password_hash +from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase +from cic_ussd.db.enum import AccountStatus +from cic_ussd.encoder import create_password_hash, check_password_hash from cic_ussd.operations import persist_session_to_db_task, create_or_update_session from cic_ussd.redis import InMemoryStore @@ -21,7 +23,7 @@ from cic_ussd.redis import InMemoryStore logg = logging.getLogger(__file__) -def is_valid_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_valid_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks a pin's validity by ensuring it has a length of for characters and the characters are numeric. :param state_machine_data: A tuple containing user input, a ussd session and user object. @@ -29,7 +31,7 @@ def is_valid_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A pin's validity :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data pin_is_valid = False matcher = r'^\d{4}$' if re.match(matcher, user_input): @@ -37,34 +39,34 @@ def is_valid_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: return pin_is_valid -def is_authorized_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_authorized_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user.verify_password(password=user_input) -def is_locked_account(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_locked_account(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether a user's account is locked due to too many failed attempts. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user.get_account_status() == AccountStatus.LOCKED.name -def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account]): +def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]): """This function hashes a pin and stores it in session data. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data # define redis cache entry point cache = InMemoryStore.cache @@ -93,54 +95,56 @@ def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Accoun service_code=in_redis_ussd_session.get('service_code'), user_input=user_input, current_menu=in_redis_ussd_session.get('state'), + session=session, session_data=session_data ) persist_session_to_db_task(external_session_id=external_session_id, queue='cic-ussd') -def pins_match(state_machine_data: Tuple[str, dict, Account]) -> bool: +def pins_match(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data initial_pin = ussd_session.get('session_data').get('initial_pin') - logg.debug(f'USSD SESSION: {ussd_session}') return check_password_hash(user_input, initial_pin) -def complete_pin_change(state_machine_data: Tuple[str, dict, Account]): +def complete_pin_change(state_machine_data: Tuple[str, dict, Account, Session]): """This function persists the user's pin to the database :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data + session = SessionBase.bind_session(session=session) password_hash = ussd_session.get('session_data').get('initial_pin') user.password_hash = password_hash - Account.session.add(user) - Account.session.commit() + session.add(user) + session.flush() + SessionBase.release_session(session=session) -def is_blocked_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_blocked_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user input confirming a specific pin matches the initial pin entered. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data return user.get_account_status() == AccountStatus.LOCKED.name -def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks whether the user's new pin is a valid pin and that it isn't the same as the old one. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple :return: A match between two pin values. :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data is_old_pin = user.verify_password(password=user_input) return is_valid_pin(state_machine_data=state_machine_data) and not is_old_pin diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/sms.py b/apps/cic-ussd/cic_ussd/state_machine/logic/sms.py index 517c09e9..6f33a555 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/sms.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/sms.py @@ -9,15 +9,15 @@ logg = logging.getLogger() def send_terms_to_user_if_required(state_machine_data: Tuple[str, dict, Account]): - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data logg.debug('Requires integration to cic-notify.') def process_mini_statement_request(state_machine_data: Tuple[str, dict, Account]): - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data logg.debug('Requires integration to cic-notify.') def upsell_unregistered_recipient(state_machine_data: Tuple[str, dict, Account]): - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data logg.debug('Requires integration to cic-notify.') \ No newline at end of file diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/transaction.py b/apps/cic-ussd/cic_ussd/state_machine/logic/transaction.py index b8894103..c2ac5e45 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/transaction.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/transaction.py @@ -5,13 +5,16 @@ from typing import Tuple # third party imports import celery +from sqlalchemy.orm.session import Session # local imports -from cic_ussd.balance import BalanceManager, compute_operational_balance +from cic_ussd.balance import compute_operational_balance from cic_ussd.chain import Chain -from cic_ussd.db.models.account import AccountStatus, Account +from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase +from cic_ussd.db.enum import AccountStatus from cic_ussd.operations import save_to_in_memory_ussd_session_data -from cic_ussd.phone_number import get_user_by_phone_number, process_phone_number, E164Format +from cic_ussd.phone_number import process_phone_number, E164Format from cic_ussd.processor import retrieve_token_symbol from cic_ussd.redis import create_cached_data_key, get_cached_data from cic_ussd.transactions import OutgoingTransactionProcessor @@ -20,7 +23,7 @@ from cic_ussd.transactions import OutgoingTransactionProcessor logg = logging.getLogger(__file__) -def is_valid_recipient(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_valid_recipient(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks that a user exists, is not the initiator of the transaction, has an active account status and is authorized to perform standard transactions. :param state_machine_data: A tuple containing user input, a ussd session and user object. @@ -28,15 +31,17 @@ def is_valid_recipient(state_machine_data: Tuple[str, dict, Account]) -> bool: :return: A user's validity :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data phone_number = process_phone_number(user_input, E164Format.region) - recipient = get_user_by_phone_number(phone_number=user_input) + session = SessionBase.bind_session(session=session) + recipient = Account.get_by_phone_number(phone_number=phone_number, session=session) + SessionBase.release_session(session=session) is_not_initiator = phone_number != user.phone_number has_active_account_status = user.get_account_status() == AccountStatus.ACTIVE.name return is_not_initiator and has_active_account_status and recipient is not None -def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, Account]) -> bool: +def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks that the transaction amount provided is valid as per the criteria for the transaction being attempted. :param state_machine_data: A tuple containing user input, a ussd session and user object. @@ -44,14 +49,14 @@ def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, Account]) - :return: A transaction amount's validity :rtype: bool """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data try: return int(user_input) > 0 except ValueError: return False -def has_sufficient_balance(state_machine_data: Tuple[str, dict, Account]) -> bool: +def has_sufficient_balance(state_machine_data: Tuple[str, dict, Account, Session]) -> bool: """This function checks that the transaction amount provided is valid as per the criteria for the transaction being attempted. :param state_machine_data: A tuple containing user input, a ussd session and user object. @@ -59,10 +64,7 @@ def has_sufficient_balance(state_machine_data: Tuple[str, dict, Account]) -> boo :return: An account balance's validity :rtype: bool """ - user_input, ussd_session, user = state_machine_data - balance_manager = BalanceManager(address=user.blockchain_address, - chain_str=Chain.spec.__str__(), - token_symbol='SRF') + user_input, ussd_session, user, session = state_machine_data # get cached balance key = create_cached_data_key( identifier=bytes.fromhex(user.blockchain_address[2:]), @@ -74,30 +76,37 @@ def has_sufficient_balance(state_machine_data: Tuple[str, dict, Account]) -> boo return int(user_input) <= operational_balance -def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Account]): +def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]): """This function saves the phone number corresponding the intended recipients blockchain account. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data session_data = ussd_session.get('session_data') or {} - session_data['recipient_phone_number'] = user_input + recipient_phone_number = process_phone_number(phone_number=user_input, region=E164Format.region) + session_data['recipient_phone_number'] = recipient_phone_number - save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session) + save_to_in_memory_ussd_session_data( + queue='cic-ussd', + session=session, + session_data=session_data, + ussd_session=ussd_session) -def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, Account]): +def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, Account, Session]): """ :param state_machine_data: :type state_machine_data: :return: :rtype: """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data - recipient = get_user_by_phone_number(phone_number=user_input) + recipient_phone_number = process_phone_number(phone_number=user_input, region=E164Format.region) + recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session) blockchain_address = recipient.blockchain_address + # retrieve and cache account's metadata s_query_person_metadata = celery.signature( 'cic_ussd.tasks.metadata.query_person_metadata', @@ -106,32 +115,36 @@ def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, Account]): s_query_person_metadata.apply_async(queue='cic-ussd') -def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, Account]): +def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]): """This function saves the phone number corresponding the intended recipients blockchain account. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data session_data = ussd_session.get('session_data') or {} session_data['transaction_amount'] = user_input - save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session) + save_to_in_memory_ussd_session_data( + queue='cic-ussd', + session=session, + session_data=session_data, + ussd_session=ussd_session) -def process_transaction_request(state_machine_data: Tuple[str, dict, Account]): +def process_transaction_request(state_machine_data: Tuple[str, dict, Account, Session]): """This function saves the phone number corresponding the intended recipients blockchain account. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data # retrieve token symbol chain_str = Chain.spec.__str__() # get user from phone number recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') - recipient = get_user_by_phone_number(phone_number=recipient_phone_number) + recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session) to_address = recipient.blockchain_address from_address = user.blockchain_address amount = int(ussd_session.get('session_data').get('transaction_amount')) diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/user.py b/apps/cic-ussd/cic_ussd/state_machine/logic/user.py index d9218845..7c01f971 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/user.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/user.py @@ -5,12 +5,14 @@ from typing import Tuple # third-party imports import celery -from cic_types.models.person import Person, generate_metadata_pointer +from cic_types.models.person import generate_metadata_pointer from cic_types.models.person import generate_vcard_from_contact_data, manage_identity_data +from sqlalchemy.orm.session import Session # local imports from cic_ussd.chain import Chain from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase from cic_ussd.error import MetadataNotFoundError from cic_ussd.metadata import blockchain_address_to_metadata_pointer from cic_ussd.operations import save_to_in_memory_ussd_session_data @@ -19,15 +21,17 @@ from cic_ussd.redis import get_cached_data logg = logging.getLogger(__file__) -def change_preferred_language_to_en(state_machine_data: Tuple[str, dict, Account]): +def change_preferred_language_to_en(state_machine_data: Tuple[str, dict, Account, Session]): """This function changes the user's preferred language to english. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data + session = SessionBase.bind_session(session=session) user.preferred_language = 'en' - Account.session.add(user) - Account.session.commit() + session.add(user) + session.flush() + SessionBase.release_session(session=session) preferences_data = { 'preferred_language': 'en' @@ -40,15 +44,17 @@ def change_preferred_language_to_en(state_machine_data: Tuple[str, dict, Account s.apply_async(queue='cic-ussd') -def change_preferred_language_to_sw(state_machine_data: Tuple[str, dict, Account]): +def change_preferred_language_to_sw(state_machine_data: Tuple[str, dict, Account, Session]): """This function changes the user's preferred language to swahili. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data - user.preferred_language = 'sw' - Account.session.add(user) - Account.session.commit() + user_input, ussd_session, account, session = state_machine_data + session = SessionBase.bind_session(session=session) + account.preferred_language = 'sw' + session.add(account) + session.flush() + SessionBase.release_session(session=session) preferences_data = { 'preferred_language': 'sw' @@ -56,20 +62,22 @@ def change_preferred_language_to_sw(state_machine_data: Tuple[str, dict, Account s = celery.signature( 'cic_ussd.tasks.metadata.add_preferences_metadata', - [user.blockchain_address, preferences_data] + [account.blockchain_address, preferences_data] ) s.apply_async(queue='cic-ussd') -def update_account_status_to_active(state_machine_data: Tuple[str, dict, Account]): +def update_account_status_to_active(state_machine_data: Tuple[str, dict, Account, Session]): """This function sets user's account to active. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data - user.activate_account() - Account.session.add(user) - Account.session.commit() + user_input, ussd_session, account, session = state_machine_data + session = SessionBase.bind_session(session=session) + account.activate_account() + session.add(account) + session.flush() + SessionBase.release_session(session=session) def process_gender_user_input(user: Account, user_input: str): @@ -81,6 +89,7 @@ def process_gender_user_input(user: Account, user_input: str): :return: :rtype: """ + gender = "" if user.preferred_language == 'en': if user_input == '1': gender = 'Male' @@ -98,13 +107,13 @@ def process_gender_user_input(user: Account, user_input: str): return gender -def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, Account]): +def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]): """This function saves first name data to the ussd session in the redis cache. :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data - + user_input, ussd_session, user, session = state_machine_data + session = SessionBase.bind_session(session=session) # get current menu current_state = ussd_session.get('state') @@ -137,7 +146,11 @@ def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, session_data = { key: user_input } - save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session) + save_to_in_memory_ussd_session_data( + queue='cic-ussd', + session=session, + session_data=session_data, + ussd_session=ussd_session) def format_user_metadata(metadata: dict, user: Account): @@ -197,12 +210,12 @@ def format_user_metadata(metadata: dict, user: Account): } -def save_complete_user_metadata(state_machine_data: Tuple[str, dict, Account]): +def save_complete_user_metadata(state_machine_data: Tuple[str, dict, Account, Session]): """This function persists elements of the user metadata stored in session data :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: tuple """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data # get session data metadata = ussd_session.get('session_data') @@ -218,8 +231,8 @@ def save_complete_user_metadata(state_machine_data: Tuple[str, dict, Account]): s_create_person_metadata.apply_async(queue='cic-ussd') -def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account]): - user_input, ussd_session, user = state_machine_data +def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account, Session]): + user_input, ussd_session, user, session = state_machine_data blockchain_address = user.blockchain_address key = generate_metadata_pointer( identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address), @@ -269,8 +282,8 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account]): s_edit_person_metadata.apply_async(queue='cic-ussd') -def get_user_metadata(state_machine_data: Tuple[str, dict, Account]): - user_input, ussd_session, user = state_machine_data +def get_user_metadata(state_machine_data: Tuple[str, dict, Account, Session]): + user_input, ussd_session, user, session = state_machine_data blockchain_address = user.blockchain_address s_get_user_metadata = celery.signature( 'cic_ussd.tasks.metadata.query_person_metadata', diff --git a/apps/cic-ussd/cic_ussd/state_machine/logic/validator.py b/apps/cic-ussd/cic_ussd/state_machine/logic/validator.py index 2af1e8d3..80747042 100644 --- a/apps/cic-ussd/cic_ussd/state_machine/logic/validator.py +++ b/apps/cic-ussd/cic_ussd/state_machine/logic/validator.py @@ -19,7 +19,7 @@ def has_cached_user_metadata(state_machine_data: Tuple[str, dict, Account]): :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data # check for user metadata in cache key = generate_metadata_pointer( identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address), @@ -34,7 +34,7 @@ def is_valid_name(state_machine_data: Tuple[str, dict, Account]): :param state_machine_data: A tuple containing user input, a ussd session and user object. :type state_machine_data: str """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data name_matcher = "^[a-zA-Z]+$" valid_name = re.match(name_matcher, user_input) if valid_name: @@ -50,7 +50,7 @@ def is_valid_gender_selection(state_machine_data: Tuple[str, dict, Account]): :return: :rtype: """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data selection_matcher = "^[1-2]$" if re.match(selection_matcher, user_input): return True @@ -65,6 +65,6 @@ def is_valid_date(state_machine_data: Tuple[str, dict, Account]): :return: :rtype: """ - user_input, ussd_session, user = state_machine_data + user_input, ussd_session, user, session = state_machine_data # For MVP this value is defaulting to year return len(user_input) == 4 and int(user_input) >= 1900 diff --git a/apps/cic-ussd/cic_ussd/validator.py b/apps/cic-ussd/cic_ussd/validator.py index 660f2783..a1706f9a 100644 --- a/apps/cic-ussd/cic_ussd/validator.py +++ b/apps/cic-ussd/cic_ussd/validator.py @@ -9,6 +9,7 @@ from confini import Config # local imports from cic_ussd.db.models.account import Account +from cic_ussd.db.models.base import SessionBase logg = logging.getLogger(__file__) @@ -45,29 +46,21 @@ def check_request_content_length(config: Config, env: dict): config.get('APP_MAX_BODY_LENGTH')) -def check_known_user(phone: str): - """ - This method attempts to ascertain whether the user already exists and is known to the system. +def check_known_user(phone_number: str, session): + """This method attempts to ascertain whether the user already exists and is known to the system. It sends a get request to the platform application and attempts to retrieve the user's data which it persists in memory. - :param phone: A valid phone number - :type phone: str + :param phone_number: A valid phone number + :type phone_number: str + :param session: + :type session: :return: Is known phone number :rtype: boolean """ - user = Account.session.query(Account).filter_by(phone_number=phone).first() - return user is not None - - -def check_phone_number(number: str): - """ - Checks whether phone number is present - :param number: A valid phone number - :type number: str - :return: Phone number presence - :rtype: boolean - """ - return number is not None + session = SessionBase.bind_session(session=session) + account = session.query(Account).filter_by(phone_number=phone_number).first() + SessionBase.release_session(session=session) + return account is not None def check_request_method(env: dict):