Merge branch 'philip/separate-telco-cicada-http-apis' into 'master'

Philip/separate telco cicada http apis

See merge request grassrootseconomics/cic-internal-integration!124
This commit is contained in:
Philip Wafula 2021-04-30 08:15:03 +00:00
commit 850dd15451
15 changed files with 209 additions and 94 deletions

View File

@ -48,10 +48,9 @@ def define_response_with_content(headers: list, response: str) -> tuple:
content_length_header = ('Content-Length', str(content_length)) content_length_header = ('Content-Length', str(content_length))
# check for content length defaulted to zero in error headers # check for content length defaulted to zero in error headers
for position, header in enumerate(headers): for position, header in enumerate(headers):
if header[0] == 'Content-Length': if 'Content-Length' in header:
headers[position] = content_length_header headers.pop(position)
else: headers.append(content_length_header)
headers.append(content_length_header)
return response_bytes, headers return response_bytes, headers

View File

@ -73,7 +73,7 @@ def process_exit_insufficient_balance(display_key: str, user: Account, ussd_sess
# compile response data # compile response data
user_input = ussd_session.get('user_input').split('*')[-1] user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input)) transaction_amount = to_wei(value=int(user_input))
token_symbol = 'SRF' token_symbol = 'GFT'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
recipient = get_user_by_phone_number(phone_number=recipient_phone_number) recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
@ -102,7 +102,7 @@ def process_exit_successful_transaction(display_key: str, user: Account, ussd_se
:rtype: str :rtype: str
""" """
transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount'))) transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount')))
token_symbol = 'SRF' token_symbol = 'GFT'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number') recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
recipient = get_user_by_phone_number(phone_number=recipient_phone_number) recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient) tx_recipient_information = define_account_tx_metadata(user=recipient)
@ -137,7 +137,7 @@ def process_transaction_pin_authorization(user: Account, display_key: str, ussd_
tx_recipient_information = define_account_tx_metadata(user=recipient) tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=user) tx_sender_information = define_account_tx_metadata(user=user)
token_symbol = 'SRF' token_symbol = 'GFT'
user_input = ussd_session.get('session_data').get('transaction_amount') user_input = ussd_session.get('session_data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input)) transaction_amount = to_wei(value=int(user_input))
logg.debug('Requires integration to determine user tokens.') logg.debug('Requires integration to determine user tokens.')
@ -175,7 +175,7 @@ def process_account_balances(user: Account, display_key: str, ussd_session: dict
operational_balance=operational_balance, operational_balance=operational_balance,
tax=tax, tax=tax,
bonus=bonus, bonus=bonus,
token_symbol='SRF' token_symbol='GFT'
) )
@ -190,7 +190,7 @@ def format_transactions(transactions: list, preferred_language: str):
timestamp = transaction.get('timestamp') timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag') action_tag = transaction.get('action_tag')
direction = transaction.get('direction') direction = transaction.get('direction')
token_symbol = 'SRF' token_symbol = 'GFT'
if action_tag == 'SENT' or action_tag == 'ULITUMA': if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {recipient_phone_number} {timestamp}.\n' formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {recipient_phone_number} {timestamp}.\n'
@ -316,7 +316,7 @@ def process_start_menu(display_key: str, user: Account):
blockchain_address = user.blockchain_address blockchain_address = user.blockchain_address
balance_manager = BalanceManager(address=blockchain_address, balance_manager = BalanceManager(address=blockchain_address,
chain_str=chain_str, chain_str=chain_str,
token_symbol='SRF') token_symbol='GFT')
# get balances synchronously for display on start menu # get balances synchronously for display on start menu
balances_data = balance_manager.get_balances() balances_data = balance_manager.get_balances()
@ -341,7 +341,7 @@ def process_start_menu(display_key: str, user: Account):
retrieve_account_statement(blockchain_address=blockchain_address) retrieve_account_statement(blockchain_address=blockchain_address)
# TODO [Philip]: figure out how to get token symbol from a metadata layer of sorts. # TODO [Philip]: figure out how to get token symbol from a metadata layer of sorts.
token_symbol = 'SRF' token_symbol = 'GFT'
return translation_for( return translation_for(
key=display_key, key=display_key,
@ -382,18 +382,29 @@ def process_request(user_input: str, user: Account, ussd_session: Optional[dict]
successive_state = next_state(ussd_session=ussd_session, user=user, user_input=user_input) successive_state = next_state(ussd_session=ussd_session, user=user, user_input=user_input)
return UssdMenu.find_by_name(name=successive_state) return UssdMenu.find_by_name(name=successive_state)
else: else:
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[user.blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
if user.has_valid_pin(): if user.has_valid_pin():
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number) last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number)
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
person_metadata = get_cached_data(key=key)
if last_ussd_session: if last_ussd_session:
# get metadata
person_metadata = get_cached_data(key=key)
# get last state # get last state
last_state = last_ussd_session.state last_state = last_ussd_session.state
# if last state is account_creation_prompt and metadata exists, show start menu # if last state is account_creation_prompt and metadata exists, show start menu
if last_state in [ if last_state in [
'account_creation_prompt', 'account_creation_prompt',

View File

@ -0,0 +1,73 @@
"""
This module handles requests originating from CICADA or any other management client for custodial wallets, processing
requests offering control of user account states to a staff behind the client.
"""
# standard imports
import logging
from urllib.parse import quote_plus
# third-party imports
from confini import Config
# local imports
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.operations import define_response_with_content
from cic_ussd.requests import (get_request_endpoint,
get_query_parameters,
process_pin_reset_requests,
process_locked_accounts_requests)
from cic_ussd.runnable.server_base import exportable_parser, logg
args = exportable_parser.parse_args()
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# parse config
config = Config(config_dir=args.c, env_prefix=args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
# handle requests from CICADA
def application(env, start_response):
"""Loads python code for application to be accessible over web server
:param env: Object containing server and request information
:type env: dict
:param start_response: Callable to define responses.
:type start_response: any
:return: a list containing a bytes representation of the response object
:rtype: list
"""
# define headers
errors_headers = [('Content-Type', 'text/plain'), ('Content-Length', '0')]
headers = [('Content-Type', 'text/plain')]
if get_request_endpoint(env) == '/pin':
phone_number = get_query_parameters(env=env, query_name='phoneNumber')
phone_number = quote_plus(phone_number)
response, message = process_pin_reset_requests(env=env, phone_number=phone_number)
response_bytes, headers = define_response_with_content(headers=errors_headers, response=response)
SessionBase.session.close()
start_response(message, headers)
return [response_bytes]
# handle requests for locked accounts
response, message = process_locked_accounts_requests(env=env)
response_bytes, headers = define_response_with_content(headers=headers, response=response)
start_response(message, headers)
SessionBase.session.close()
return [response_bytes]

View File

@ -1,19 +1,16 @@
"""Functions defining WSGI interaction with external http requests """This module handles requests originating from the ussd service provider.
Defines an application function essential for the uWSGI python loader to run th python application code.
""" """
# standard imports # standard imports
import argparse
import celery
import i18n
import json import json
import logging import logging
import os
import redis
# third-party imports # third-party imports
from confini import Config import celery
import i18n
import redis
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from urllib.parse import quote_plus from confini import Config
# local imports # local imports
from cic_ussd.chain import Chain from cic_ussd.chain import Chain
@ -30,32 +27,14 @@ from cic_ussd.operations import (define_response_with_content,
from cic_ussd.phone_number import process_phone_number from cic_ussd.phone_number import process_phone_number
from cic_ussd.redis import InMemoryStore from cic_ussd.redis import InMemoryStore
from cic_ussd.requests import (get_request_endpoint, from cic_ussd.requests import (get_request_endpoint,
get_request_method, get_request_method)
get_query_parameters, from cic_ussd.runnable.server_base import exportable_parser, logg
process_locked_accounts_requests,
process_pin_reset_requests)
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.state_machine import UssdStateMachine from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number, \ from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number, \
validate_presence validate_presence
logging.basicConfig(level=logging.WARNING) args = exportable_parser.parse_args()
logg = logging.getLogger()
config_directory = '/usr/local/etc/cic-ussd/'
# define arguments
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=config_directory, help='config directory.')
arg_parser.add_argument('-q', type=str, default='cic-ussd', help='queue name for worker tasks')
arg_parser.add_argument('-v', action='store_true', help='be verbose')
arg_parser.add_argument('-vv', action='store_true', help='be more verbose')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
args = arg_parser.parse_args()
# define log levels # define log levels
if args.vv: if args.vv:
@ -69,7 +48,14 @@ config.process()
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# initialize elements # set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name,
pool_size=int(config.get('DATABASE_POOL_SIZE')),
debug=config.true('DATABASE_DEBUG'))
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
# set up translations # set up translations
i18n.load_path.append(config.get('APP_LOCALE_PATH')) i18n.load_path.append(config.get('APP_LOCALE_PATH'))
i18n.set('fallback', config.get('APP_LOCALE_FALLBACK')) i18n.set('fallback', config.get('APP_LOCALE_FALLBACK'))
@ -82,12 +68,6 @@ ussd_menu_db = create_local_file_data_stores(file_location=config.get('USSD_MENU
table_name='ussd_menu') table_name='ussd_menu')
UssdMenu.ussd_menu_db = ussd_menu_db UssdMenu.ussd_menu_db = ussd_menu_db
# set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
# define universal redis cache access # define universal redis cache access
InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'), InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
port=config.get('REDIS_PORT'), port=config.get('REDIS_PORT'),
@ -134,6 +114,8 @@ def application(env, start_response):
:type env: dict :type env: dict
:param start_response: Callable to define responses. :param start_response: Callable to define responses.
:type start_response: any :type start_response: any
:return: a list containing a bytes representation of the response object
:rtype: list
""" """
# define headers # define headers
errors_headers = [('Content-Type', 'text/plain'), ('Content-Length', '0')] errors_headers = [('Content-Type', 'text/plain'), ('Content-Length', '0')]
@ -194,20 +176,3 @@ def application(env, start_response):
start_response('200 OK,', headers) start_response('200 OK,', headers)
SessionBase.session.close() SessionBase.session.close()
return [response_bytes] return [response_bytes]
# handle pin requests
if get_request_endpoint(env) == '/pin':
phone_number = get_query_parameters(env=env, query_name='phoneNumber')
phone_number = quote_plus(phone_number)
response, message = process_pin_reset_requests(env=env, phone_number=phone_number)
response_bytes, headers = define_response_with_content(headers=errors_headers, response=response)
SessionBase.session.close()
start_response(message, headers)
return [response_bytes]
# handle requests for locked accounts
response, message = process_locked_accounts_requests(env=env)
response_bytes, headers = define_response_with_content(headers=headers, response=response)
start_response(message, headers)
SessionBase.session.close()
return [response_bytes]

View File

@ -0,0 +1,38 @@
"""This module handles generic wsgi server configurations that can then be subsumed by different server flavors for the
cic-ussd component.
"""
# standard imports
import logging
import os
from argparse import ArgumentParser
# third-party imports
# local imports
# define a logging system
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
# define default config directory as would be defined in docker
default_config_dir = '/usr/local/etc/cic-ussd/'
# define args parser
arg_parser = ArgumentParser(description='CLI for handling cic-ussd server applications.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('-q', type=str, default='cic-ussd', help='queue name for worker tasks')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
exportable_parser = arg_parser

View File

@ -1,7 +1,7 @@
# standard imports # standard imports
import semver import semver
version = (0, 3, 0, 'alpha.9') version = (0, 3, 0, 'alpha.10')
version_object = semver.VersionInfo( version_object = semver.VersionInfo(
major=version[0], major=version[0],

View File

@ -38,8 +38,9 @@ COPY cic-ussd/transitions/ cic-ussd/transitions/
COPY cic-ussd/var/ cic-ussd/var/ COPY cic-ussd/var/ cic-ussd/var/
COPY cic-ussd/docker/db.sh \ COPY cic-ussd/docker/db.sh \
cic-ussd/docker/start_tasker.sh \ cic-ussd/docker/start_cic_user_tasker.sh \
cic-ussd/docker/start_uwsgi.sh \ cic-ussd/docker/start_cic_user_ussd_server.sh\
cic-ussd/docker/start_cic_user_server.sh\
/root/ /root/
RUN chmod +x /root/*.sh RUN chmod +x /root/*.sh

View File

@ -0,0 +1,7 @@
#!/bin/bash
. /root/db.sh
user_server_port=${SERVER_PORT:-9500}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/daemons/cic_user_server.py --http :"$user_server_port" --pyargv "$@"

View File

@ -0,0 +1,5 @@
#!/bin/bash
. /root/db.sh
/usr/local/bin/cic-user-tasker "$@"

View File

@ -0,0 +1,7 @@
#!/bin/bash
. /root/db.sh
user_ussd_server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/daemons/cic_user_ussd_server.py --http :"$user_ussd_server_port" --pyargv "$@"

View File

@ -1,5 +0,0 @@
#!/bin/bash
. /root/db.sh
/usr/local/bin/cic-ussd-tasker $@

View File

@ -1,7 +0,0 @@
#!/bin/bash
. /root/db.sh
server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@"

View File

@ -35,6 +35,7 @@ packages =
cic_ussd.menu cic_ussd.menu
cic_ussd.metadata cic_ussd.metadata
cic_ussd.runnable cic_ussd.runnable
cic_ussd.runnable.daemons
cic_ussd.session cic_ussd.session
cic_ussd.state_machine cic_ussd.state_machine
cic_ussd.state_machine.logic cic_ussd.state_machine.logic
@ -44,5 +45,5 @@ scripts =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
cic-ussd-tasker = cic_ussd.runnable.tasker:main cic-user-tasker = cic_ussd.runnable.daemons.cic_user_tasker:main
cic-ussd-client = cic_ussd.runnable.client:main cic-ussd-client = cic_ussd.runnable.client:main

View File

@ -488,8 +488,7 @@ services:
- ${LOCAL_VOLUME_DIR:-/tmp/cic}/pgp:/tmp/cic/pgp - ${LOCAL_VOLUME_DIR:-/tmp/cic}/pgp:/tmp/cic/pgp
# command: "/root/start_server.sh -vv" # command: "/root/start_server.sh -vv"
cic-ussd-server: cic-user-ussd-server:
# image: grassrootseconomics:cic-ussd
build: build:
context: apps/ context: apps/
dockerfile: cic-ussd/docker/Dockerfile dockerfile: cic-ussd/docker/Dockerfile
@ -507,7 +506,7 @@ services:
SERVER_PORT: 9000 SERVER_PORT: 9000
CIC_META_URL: ${CIC_META_URL:-http://meta:8000} CIC_META_URL: ${CIC_META_URL:-http://meta:8000}
ports: ports:
- ${HTTP_PORT_CIC_USSD:-63315}:9000 - ${HTTP_PORT_CIC_USER_USSD_SERVER:-63315}:9000
depends_on: depends_on:
- postgres - postgres
- redis - redis
@ -516,10 +515,31 @@ services:
deploy: deploy:
restart_policy: restart_policy:
condition: on-failure condition: on-failure
command: "/root/start_uwsgi.sh -vv" command: "/root/start_cic_user_ussd_server.sh -vv"
cic-ussd-tasker: cic-user-server:
# image: grassrootseconomics:cic-ussd build:
context: apps
dockerfile: cic-ussd/docker/Dockerfile
environment:
DATABASE_USER: grassroots
DATABASE_HOST: postgres
DATABASE_PORT: 5432
DATABASE_PASSWORD: tralala
DATABASE_NAME: cic_ussd
DATABASE_ENGINE: postgresql
DATABASE_DRIVER: psycopg2
DATABASE_POOL_SIZE: 0
ports:
- ${HTTP_PORT_CIC_USER_SERVER:-63415}:9500
depends_on:
- postgres
deploy:
restart_policy:
condition: on-failure
command: "/root/start_cic_user_server.sh -vv"
cic-user-tasker:
build: build:
context: apps context: apps
dockerfile: cic-ussd/docker/Dockerfile dockerfile: cic-ussd/docker/Dockerfile
@ -544,4 +564,4 @@ services:
deploy: deploy:
restart_policy: restart_policy:
condition: on-failure condition: on-failure
command: "/root/start_tasker.sh -q cic-ussd -vv" command: "/root/start_cic_user_tasker.sh -q cic-ussd -vv"