Compare commits

...

16 Commits

Author SHA1 Message Date
Philip Wafula 3dcfbe59fe Merge branch 'philip/translations-v1' into 'master'
Adds updated translations.

See merge request grassrootseconomics/cic-internal-integration!328
2022-01-10 07:31:04 +00:00
Philip Wafula 0dd21f3970 Adds updated translations. 2022-01-10 07:31:03 +00:00
Philip Wafula 6ca8632cde Merge branch 'sohail/help-menu-fix' into 'master'
fix: ussd help menu

See merge request grassrootseconomics/cic-internal-integration!327
2022-01-10 06:56:52 +00:00
Mohamed Sohail b7dc290992 fix: ussd help menu 2022-01-10 06:56:52 +00:00
Philip Wafula e69801ea08 Merge branch 'philip/ussd-poler' into 'master'
Philip/ussd poler

See merge request grassrootseconomics/cic-internal-integration!326
2022-01-10 04:44:51 +00:00
Philip Wafula 50a596e707 Philip/ussd poler 2022-01-10 04:44:50 +00:00
Philip Wafula cb61e45e4c Merge branch 'philip/ussd-data-files' into 'master'
Move cic-ussd datafiles to apps/cic-ussd/cic_ussd/data/

See merge request grassrootseconomics/cic-internal-integration!325
2022-01-07 10:50:49 +00:00
Philip Wafula e5b06b18d7 Move cic-ussd datafiles to apps/cic-ussd/cic_ussd/data/ 2022-01-07 10:50:49 +00:00
William Luke 5d1a30021a Merge branch 'fix/docs-link' into 'master'
fix: link to docs

See merge request grassrootseconomics/cic-internal-integration!324
2022-01-05 12:22:15 +00:00
William Luke ade3f4e917 fix: link to docs 2022-01-05 15:19:11 +03:00
Mohamed Sohail c3e924ae8f Merge branch 'sohail/alt-build' into 'master'
feat (devops): add local image build for releases

See merge request grassrootseconomics/cic-internal-integration!322
2022-01-05 08:11:38 +00:00
Mohamed Sohail ff4c42dc24 feat (devops): add local image build for releases 2022-01-05 08:11:38 +00:00
Philip Wafula b8cd7eec56 Merge branch 'philip/bump-test-coverage' into 'master'
Rehabilitate test coverage in ussd and cic-notify

See merge request grassrootseconomics/cic-internal-integration!323
2022-01-04 16:51:02 +00:00
Philip Wafula 837e8da650 Rehabilitate test coverage in ussd and cic-notify 2022-01-04 16:51:02 +00:00
Philip Wafula fe3f2c2549 Merge branch 'philip/cleanup-hardening' into 'master'
USSD Hardening and Cleanups

See merge request grassrootseconomics/cic-internal-integration!320
2022-01-04 16:16:01 +00:00
Philip Wafula 46f25e5678 USSD Hardening and Cleanups 2022-01-04 16:16:00 +00:00
104 changed files with 3929 additions and 1564 deletions

View File

@ -15,5 +15,5 @@ To get started see [./apps/contract-migration/README.md](./apps/contract-migrati
## Documentation
[https://docs.grassecon.org/cic_stack/](https://docs.grassecon.org/cic_stack/)
[https://docs.grassecon.org/software/](https://docs.grassecon.org/software/)

View File

@ -17,7 +17,7 @@ from cic_eth_registry.error import UnknownContractError
# local imports
from cic_eth.error import SeppukuError
from cic_eth.db.models.base import SessionBase
from cic_eth.eth.util import CacheGasOracle
from cic_eth.eth.util import CacheGasOracle, MaxGasOracle
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
@ -41,21 +41,24 @@ class BaseTask(celery.Task):
def create_gas_oracle(self, conn, address=None, *args, **kwargs):
if address == None:
return RPCGasOracle(
x = None
if address is None:
x = RPCGasOracle(
conn,
code_callback=kwargs.get('code_callback', self.get_min_fee_limit),
min_price=self.min_fee_price,
id_generator=kwargs.get('id_generator'),
)
else:
return CacheGasOracle(
conn,
address,
method=kwargs.get('method'),
min_price=self.min_fee_price,
id_generator=kwargs.get('id_generator'),
)
x = MaxGasOracle(conn)
x.code_callback = x.get_fee_units
return x
def get_min_fee_limit(self, code):
return self.min_fee_limit
def get_min_fee_limit(self, code):
@ -84,7 +87,7 @@ class BaseTask(celery.Task):
)
s.apply_async()
class CriticalTask(BaseTask):
retry_jitter = True
retry_backoff = True
@ -96,7 +99,7 @@ class CriticalSQLAlchemyTask(CriticalTask):
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
)
)
class CriticalWeb3Task(CriticalTask):
@ -104,7 +107,7 @@ class CriticalWeb3Task(CriticalTask):
ConnectionError,
)
safe_gas_threshold_amount = 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
safe_gas_refill_amount = safe_gas_threshold_amount * 5
safe_gas_gifter_balance = safe_gas_threshold_amount * 5 * 100
@ -122,13 +125,13 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
)
)
class CriticalWeb3AndSignerTask(CriticalWeb3Task):
autoretry_for = (
ConnectionError,
)
@celery_app.task()
def check_health(self):
pass

View File

@ -1,10 +1,9 @@
[DATABASE]
user = postgres
password =
host = localhost
port = 5432
name = /tmp/cic-notify.db
#engine = postgresql
#driver = psycopg2
engine = sqlite
driver = pysqlite
[database]
name=cic_notify_test
user=
password=
host=localhost
port=
engine=sqlite
driver=pysqlite
debug=0

View File

@ -0,0 +1,7 @@
[report]
omit =
venv/*
scripts/*
cic_notify/db/migrations/*
cic_notify/runnable/*
cic_notify/version.py

View File

@ -3,6 +3,7 @@ import logging
import re
# third-party imports
import cic_notify.tasks.sms.db
from celery.app.control import Inspect
import celery
@ -13,45 +14,16 @@ app = celery.current_app
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue=None):
def __init__(self, queue: any = 'cic-notify'):
"""
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
self.queue = queue
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
def sms(self, message, recipient):
def sms(self, message: str, recipient: str):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
:param message: The message to be sent to the recipient.
:type message: str
@ -60,24 +32,9 @@ class Api:
:return: a celery Task
:rtype: Celery.Task
"""
signatures = []
for q in self.sms_tasks:
if not self.queue:
queue = q[0]
else:
queue = self.queue
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=queue,
)
signatures.append(signature)
t = celery.group(signatures)()
return t
s_send = celery.signature('cic_notify.tasks.sms.africastalking.send', [message, recipient], queue=self.queue)
s_log = celery.signature('cic_notify.tasks.sms.log.log', [message, recipient], queue=self.queue)
s_persist_notification = celery.signature(
'cic_notify.tasks.sms.db.persist_notification', [message, recipient], queue=self.queue)
signatures = [s_send, s_log, s_persist_notification]
return celery.group(signatures)()

View File

@ -2,7 +2,7 @@
[alembic]
# path to migration scripts
script_location = migrations
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
@ -27,28 +27,17 @@ script_location = migrations
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# to ./versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# version_locations = %(here)s/bar %(here)s/bat ./versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgres+psycopg2://postgres@localhost/cic-notify
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

View File

@ -11,7 +11,7 @@ config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
fileConfig(config.config_file_name, disable_existing_loggers=True)
# add your model's MetaData object here
# for 'autogenerate' support
@ -56,11 +56,14 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
connectable = context.config.attributes.get("connection", None)
if connectable is None:
connectable = engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(

View File

@ -7,7 +7,7 @@ import celery
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
local_logg = logging.getLogger(__name__)
local_logg = logging.getLogger()
@celery_app.task

View File

@ -1,5 +1,9 @@
pytest~=6.0.1
pytest-celery~=0.0.0a1
pytest-mock~=3.3.1
pysqlite3~=0.4.3
pytest-cov==2.10.1
Faker==11.1.0
faker-e164==0.1.0
pytest==6.2.5
pytest-celery~=0.0.0
pytest-mock==3.6.1
pysqlite3~=0.4.6
pytest-cov==3.0.0
pytest-alembic==0.7.0
requests-mock==1.9.3

View File

View File

@ -0,0 +1,28 @@
import pytest
def test_single_head_revision(alembic_runner):
heads = alembic_runner.heads
head_count = len(heads)
assert head_count == 1
def test_upgrade(alembic_runner):
try:
alembic_runner.migrate_up_to("head")
except RuntimeError:
pytest.fail('Failed to upgrade to the head revision.')
def test_up_down_consistency(alembic_runner):
try:
for revision in alembic_runner.history.revisions:
alembic_runner.migrate_up_to(revision)
except RuntimeError:
pytest.fail('Failed to upgrade through each revision individually.')
try:
for revision in reversed(alembic_runner.history.revisions):
alembic_runner.migrate_down_to(revision)
except RuntimeError:
pytest.fail('Failed to downgrade through each revision individually.')

View File

@ -0,0 +1,27 @@
# standard imports
# external imports
from faker import Faker
from faker_e164.providers import E164Provider
# local imports
from cic_notify.db.enum import NotificationStatusEnum, NotificationTransportEnum
from cic_notify.db.models.notification import Notification
# test imports
from tests.helpers.phone import phone_number
def test_notification(init_database):
message = 'Hello world'
recipient = phone_number()
notification = Notification(NotificationTransportEnum.SMS, recipient, message)
init_database.add(notification)
init_database.commit()
notification = init_database.query(Notification).get(1)
assert notification.status == NotificationStatusEnum.UNKNOWN
assert notification.recipient == recipient
assert notification.message == message
assert notification.transport == NotificationTransportEnum.SMS

View File

@ -0,0 +1,38 @@
# standard imports
import os
# third-party imports
# local imports
from cic_notify.db import dsn_from_config
def test_dsn_from_config(load_config):
"""
"""
# test dsn for other db formats
overrides = {
'DATABASE_PASSWORD': 'password',
'DATABASE_DRIVER': 'psycopg2',
'DATABASE_ENGINE': 'postgresql'
}
load_config.dict_override(dct=overrides, dct_description='Override values to test different db formats.')
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
dsn = dsn_from_config(load_config)
assert dsn == f"{scheme}://{load_config.get('DATABASE_USER')}:{load_config.get('DATABASE_PASSWORD')}@{load_config.get('DATABASE_HOST')}:{load_config.get('DATABASE_PORT')}/{load_config.get('DATABASE_NAME')}"
# undoes overrides to revert engine and drivers to sqlite
overrides = {
'DATABASE_PASSWORD': '',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_ENGINE': 'sqlite'
}
load_config.dict_override(dct=overrides, dct_description='Override values to test different db formats.')
# test dsn for sqlite engine
dsn = dsn_from_config(load_config)
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
assert dsn == f'{scheme}:///{load_config.get("DATABASE_NAME")}'

View File

@ -0,0 +1,75 @@
# standard imports
import logging
import os
# external imports
import pytest
import requests_mock
# local imports
from cic_notify.error import NotInitializedError, AlreadyInitializedError, NotificationSendError
from cic_notify.tasks.sms.africastalking import AfricasTalkingNotifier
# test imports
from tests.helpers.phone import phone_number
def test_africas_talking_notifier(africastalking_response, caplog):
caplog.set_level(logging.DEBUG)
with pytest.raises(NotInitializedError) as error:
AfricasTalkingNotifier()
assert str(error.value) == ''
api_key = os.urandom(24).hex()
sender_id = 'bar'
username = 'sandbox'
AfricasTalkingNotifier.initialize(username, api_key, sender_id)
africastalking_notifier = AfricasTalkingNotifier()
assert africastalking_notifier.sender_id == sender_id
assert africastalking_notifier.initiated is True
with pytest.raises(AlreadyInitializedError) as error:
AfricasTalkingNotifier.initialize(username, api_key, sender_id)
assert str(error.value) == ''
with requests_mock.Mocker(real_http=False) as request_mocker:
message = 'Hello world.'
recipient = phone_number()
africastalking_response.get('SMSMessageData').get('Recipients')[0]['number'] = recipient
request_mocker.register_uri(method='POST',
headers={'content-type': 'application/json'},
json=africastalking_response,
url='https://api.sandbox.africastalking.com/version1/messaging',
status_code=200)
africastalking_notifier.send(message, recipient)
assert f'Africastalking response sender-id {africastalking_response}' in caplog.text
africastalking_notifier.sender_id = None
africastalking_notifier.send(message, recipient)
assert f'africastalking response no-sender-id {africastalking_response}' in caplog.text
with pytest.raises(NotificationSendError) as error:
status = 'InvalidPhoneNumber'
status_code = 403
africastalking_response.get('SMSMessageData').get('Recipients')[0]['status'] = status
africastalking_response.get('SMSMessageData').get('Recipients')[0]['statusCode'] = status_code
request_mocker.register_uri(method='POST',
headers={'content-type': 'application/json'},
json=africastalking_response,
url='https://api.sandbox.africastalking.com/version1/messaging',
status_code=200)
africastalking_notifier.send(message, recipient)
assert str(error.value) == f'Sending notification failed due to: {status}'
with pytest.raises(NotificationSendError) as error:
recipients = []
status = 'InsufficientBalance'
africastalking_response.get('SMSMessageData')['Recipients'] = recipients
africastalking_response.get('SMSMessageData')['Message'] = status
request_mocker.register_uri(method='POST',
headers={'content-type': 'application/json'},
json=africastalking_response,
url='https://api.sandbox.africastalking.com/version1/messaging',
status_code=200)
africastalking_notifier.send(message, recipient)
assert str(error.value) == f'Unexpected number of recipients: {len(recipients)}. Status: {status}'

View File

@ -0,0 +1,26 @@
# standard imports
# external imports
import celery
# local imports
from cic_notify.db.enum import NotificationStatusEnum, NotificationTransportEnum
from cic_notify.db.models.notification import Notification
# test imports
from tests.helpers.phone import phone_number
def test_persist_notification(celery_session_worker, init_database):
message = 'Hello world.'
recipient = phone_number()
s_persist_notification = celery.signature(
'cic_notify.tasks.sms.db.persist_notification', (message, recipient)
)
s_persist_notification.apply_async().get()
notification = Notification.session.query(Notification).filter_by(recipient=recipient).first()
assert notification.status == NotificationStatusEnum.UNKNOWN
assert notification.recipient == recipient
assert notification.message == message
assert notification.transport == NotificationTransportEnum.SMS

View File

@ -0,0 +1,21 @@
# standard imports
import logging
# external imports
import celery
# local imports
# test imports
from tests.helpers.phone import phone_number
def test_log(caplog, celery_session_worker):
message = 'Hello world.'
recipient = phone_number()
caplog.set_level(logging.INFO)
s_log = celery.signature(
'cic_notify.tasks.sms.log.log', [message, recipient]
)
s_log.apply_async().get()
assert f'message to {recipient}: {message}' in caplog.text

View File

@ -0,0 +1,24 @@
# standard imports
# external imports
import celery
# local imports
from cic_notify.api import Api
# test imports
from tests.helpers.phone import phone_number
def test_api(celery_session_worker, mocker):
mocked_group = mocker.patch('celery.group')
message = 'Hello world.'
recipient = phone_number()
s_send = celery.signature('cic_notify.tasks.sms.africastalking.send', [message, recipient], queue=None)
s_log = celery.signature('cic_notify.tasks.sms.log.log', [message, recipient], queue=None)
s_persist_notification = celery.signature(
'cic_notify.tasks.sms.db.persist_notification', [message, recipient], queue=None)
signatures = [s_send, s_log, s_persist_notification]
api = Api(queue=None)
api.sms(message, recipient)
mocked_group.assert_called_with(signatures)

View File

@ -1,31 +1,13 @@
# standard imports
import sys
import os
import pytest
import logging
# third party imports
import confini
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
sys.path.insert(0, root_dir)
# local imports
from cic_notify.db.models.base import SessionBase
#from transport.notification import AfricastalkingNotification
# fixtures
from tests.fixtures_config import *
from tests.fixtures_celery import *
from tests.fixtures_database import *
# test imports
logg = logging.getLogger()
#@pytest.fixture(scope='session')
#def africastalking_notification(
# load_config,
# ):
# return AfricastalkingNotificationTransport(load_config)
#
from .fixtures.celery import *
from .fixtures.config import *
from .fixtures.database import *
from .fixtures.result import *

View File

@ -37,12 +37,6 @@ def celery_config():
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('cic-notify'),
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True

View File

@ -0,0 +1,32 @@
# standard imports
import os
import logging
# external imports
import pytest
from confini import Config
logg = logging.getLogger(__file__)
fixtures_dir = os.path.dirname(__file__)
root_directory = os.path.dirname(os.path.dirname(fixtures_dir))
@pytest.fixture(scope='session')
def alembic_config():
migrations_directory = os.path.join(root_directory, 'cic_notify', 'db', 'migrations', 'default')
file = os.path.join(migrations_directory, 'alembic.ini')
return {
'file': file,
'script_location': migrations_directory
}
@pytest.fixture(scope='session')
def load_config():
config_directory = os.path.join(root_directory, '.config/test')
config = Config(default_dir=config_directory)
config.process()
logg.debug('config loaded\n{}'.format(config))
return config

View File

@ -0,0 +1,54 @@
# standard imports
import os
# third-party imports
import pytest
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_notify.db import dsn_from_config
from cic_notify.db.models.base import SessionBase, create_engine
from .config import root_directory
@pytest.fixture(scope='session')
def alembic_engine(load_config):
data_source_name = dsn_from_config(load_config)
return create_engine(data_source_name)
@pytest.fixture(scope='session')
def database_engine(load_config):
if load_config.get('DATABASE_ENGINE') == 'sqlite':
try:
os.unlink(load_config.get('DATABASE_NAME'))
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn)
return dsn
@pytest.fixture(scope='function')
def init_database(load_config, database_engine):
db_directory = os.path.join(root_directory, 'cic_notify', 'db')
migrations_directory = os.path.join(db_directory, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrations_directory):
migrations_directory = os.path.join(db_directory, 'migrations', 'default')
session = SessionBase.create_session()
alembic_config = AlembicConfig(os.path.join(migrations_directory, 'alembic.ini'))
alembic_config.set_main_option('sqlalchemy.url', database_engine)
alembic_config.set_main_option('script_location', migrations_directory)
alembic.command.downgrade(alembic_config, 'base')
alembic.command.upgrade(alembic_config, 'head')
yield session
session.commit()
session.close()

View File

@ -0,0 +1,24 @@
# standard imports
# external imports
import pytest
# local imports
# test imports
@pytest.fixture(scope="function")
def africastalking_response():
return {
"SMSMessageData": {
"Message": "Sent to 1/1 Total Cost: KES 0.8000",
"Recipients": [{
"statusCode": 101,
"number": "+254711XXXYYY",
"status": "Success",
"cost": "KES 0.8000",
"messageId": "ATPid_SampleTxnId123"
}]
}
}

View File

@ -1,20 +0,0 @@
# standard imports
import os
import logging
# third-party imports
import pytest
import confini
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))
return conf

View File

@ -1,48 +0,0 @@
# standard imports
import os
# third-party imports
import pytest
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_notify.db import SessionBase
from cic_notify.db import dsn_from_config
@pytest.fixture(scope='session')
def database_engine(
load_config,
):
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn)
return dsn
@pytest.fixture(scope='function')
def init_database(
load_config,
database_engine,
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_notify', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
yield session
session.commit()
session.close()

View File

@ -0,0 +1,16 @@
# standard imports
# external imports
from faker import Faker
from faker_e164.providers import E164Provider
# local imports
# test imports
fake = Faker()
fake.add_provider(E164Provider)
def phone_number() -> str:
return fake.e164('KE')

View File

@ -1,34 +0,0 @@
# standard imports
import json
# third party imports
import pytest
import celery
# local imports
from cic_notify.tasks.sms import db
from cic_notify.tasks.sms import log
def test_log_notification(
celery_session_worker,
):
recipient = '+25412121212'
content = 'bar'
s_log = celery.signature('cic_notify.tasks.sms.log.log')
t = s_log.apply_async(args=[recipient, content])
r = t.get()
def test_db_notification(
init_database,
celery_session_worker,
):
recipient = '+25412121213'
content = 'foo'
s_db = celery.signature('cic_notify.tasks.sms.db.persist_notification')
t = s_db.apply_async(args=[recipient, content])
r = t.get()

View File

@ -3,4 +3,5 @@ omit =
venv/*
scripts/*
cic_ussd/db/migrations/*
cic_ussd/runnable/*
cic_ussd/runnable/*
cic_ussd/version.py

View File

@ -0,0 +1,22 @@
# standard imports
# external imports
# local imports
class Guardianship:
guardians: list = []
@classmethod
def load_system_guardians(cls, guardians_file: str):
with open(guardians_file, 'r') as system_guardians:
cls.guardians = [line.strip() for line in system_guardians]
def is_system_guardian(self, phone_number: str):
"""
:param phone_number:
:type phone_number:
:return:
:rtype:
"""
return phone_number in self.guardians

View File

@ -13,7 +13,6 @@ from cic_types.condiments import MetadataPointer
from cic_ussd.account.chain import Chain
from cic_ussd.account.transaction import from_wei
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
@ -97,17 +96,3 @@ def query_statement(blockchain_address: str, limit: int = 9):
callback_param=blockchain_address
)
cic_eth_api.list(address=blockchain_address, limit=limit)
def statement_transaction_set(preferred_language: str, transaction_reprs: list):
"""
:param preferred_language:
:type preferred_language:
:param transaction_reprs:
:type transaction_reprs:
:return:
:rtype:
"""
if not transaction_reprs:
return translation_for('helpers.no_transaction_history', preferred_language)
return ''.join(f'{transaction_repr}\n' for transaction_repr in transaction_reprs)

View File

@ -14,8 +14,7 @@ from cic_ussd.account.chain import Chain
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError, SeppukuError
from cic_ussd.metadata.tokens import query_token_info, query_token_metadata
from cic_ussd.processor.util import wait_for_cache
from cic_ussd.translation import translation_for
from cic_ussd.processor.poller import wait_for_cache
logg = logging.getLogger(__file__)
@ -326,16 +325,3 @@ def set_active_token(blockchain_address: str, token_symbol: str):
cache_data(key=key, data=token_symbol)
def token_list_set(preferred_language: str, token_data_reprs: list):
"""
:param preferred_language:
:type preferred_language:
:param token_data_reprs:
:type token_data_reprs:
:return:
:rtype:
"""
if not token_data_reprs:
return translation_for('helpers.no_tokens_list', preferred_language)
return ''.join(f'{token_data_repr}\n' for token_data_repr in token_data_reprs)

View File

@ -14,7 +14,7 @@ class Cache:
store: Redis = None
def cache_data(key: str, data: str):
def cache_data(key: str, data: [bytes, float, int, str]):
"""
:param key:
:type key:
@ -55,5 +55,6 @@ def cache_data_key(identifier: Union[list, bytes], salt: MetadataPointer):
hash_object.update(identity)
else:
hash_object.update(identifier)
hash_object.update(salt.value.encode(encoding="utf-8"))
if salt != MetadataPointer.NONE:
hash_object.update(salt.value.encode(encoding="utf-8"))
return hash_object.digest().hex()

View File

@ -0,0 +1,9 @@
+254707628499
+254757628885
+254757628900
+254792048646
+254792048228
+254792048490
+254792048902
+254727806655
+254790079966

View File

@ -0,0 +1,19 @@
keys,en,sw,kam,kik,miji,luo,bor
female,Female,Mwanamke,Mundumuka,Mutumia,Muche,Dhako,Uwole
from,From,Kutoka kwa,Kuma kwa,Kuuma kwa,Ulaako,Kowuok kuom,ira
male,Male,Mwanaume,Mundume,Mundurume,Mulume,Dichuo,Dir
not_provided,Not provided,Haijawekwa,Inenganitwe,Ndiikiritwo,Kaphana,Okoketi,Kes inkan
no_language_list,No language list,Hamna lugha ya kuchagua,Vai luka ya kusakwa,Hatire ruthiomi rwakucagurwo,Kahana luga irio orodeshwa,Onge dhok miyiero,Afaan chaguad injirt
no_transaction_history,No transaction history,Hamna ripoti ya matumizi,Vai livoti ya utumii,Hatire riboti ya mahuthira,Kahana repoti ya mahumizi,Onge ripot mar tiyo,Odhuu jalkaban injirt
no_tokens_list,No more Sarafu,Hamna sarafu zingine,Vai Sarafu ingi,Hatire Sarafu inge,Kahana Sarafu zaidi,Onge Sarafu moko,Sarafu dibii injirt
other,Other,Nyingine,Ingi,Inge,Nyinjine,Moko,Ta dibii
received,Received,Ulipokea,Niwakwatie,Niuramukirire ,Hokera,Niyudo,Argat
sent,Sent,Ulituma,Niwatumie,Niuratumire,Humwa,Nioro,Ergan
to,To,Kwa,Kwa,Hare,Kwa,Ne,Es
guardians_list_header,Your PIN guards are:,PIN Walinzi uliowaongeza ni:,PIN Atetheesya ala wongelile ni:,Agiteri a PIN yaku ni:,PIN Aimirizi urioika ni:,PIN Jorit magi gin:,PIN Naam at korkorad:
no_guardians_list,No PIN guardians set,Hamna PIN walinzi walioongezwa,Vai atetheesya mongelwa,Hartire agiteri meekeretwo,Kahana aimirizi adzoikwa,Onge jorit moketi,Nam an korkorad injirt
error.no_phone_number_provided,No phone number was provided,Nambari ya simu haijawekwa,Namba ya simu inaikiwa,Namba ya thimu ndihianetwo,Kahana namba ya simu idzopewa,Namba mar simu okoketi,Namba simu kees inkaan
error.no_matching_account,The number provided is not registered,Nambari uliyoweka haijasajiliwa,Namba ya simu ila wekiya ti mbandikithye,Namba iria wekera ndiandekithetwo,Namba idzopewa kaidzagwe kusajiliwa,Namba mar simu miketo pok ondiki,Namba ka at kekeet sajiil incab
error.is_initiator,Phone number cannot be your own,Nambari yafaa kuwa tofauti na yako,Namba ya simu yaile ithiwa itavwanene na yaku,Namba ifatie gukorwo ina utiganu na yaku,Namba yasimu kaidima kukala niyako,Namba onego obed mopogre gimari,Namba simu tete tau mal
error.is_existent_guardian,This phone number is already added as a PIN guardian,Nambari hii tayari imeongezwa kama mlinzi wa nambari ya siri,Namba ii niyongeletwe tayari ta mutethesya wa kusovya pin,Namba ino niyongereirwo ta murugamereri ya namba ya thiri,Nambari ii yasimu yaikwa kare Muimirizi,Nambani oseketi kaka jarit,Namba tana yayu nam korkoradi taat
error.is_not_existent_guardian,Phone number not set as PIN guardian,Nambari hii haijaongezwa kama mlinzi wa nambari ya PIN,Namba ii iyongeletwe ta mutethesya wa kusovya PIN,Namba ino ndiongereirwo ta mugiteri wa PIN,Nambari ii yasimu kaiikika kugaluza PIN zda mwimirizi,Nambani pok omed kaka jarit,Namba simu ta nam korkorad indharan
1 keys en sw kam kik miji luo bor
2 female Female Mwanamke Mundumuka Mutumia Muche Dhako Uwole
3 from From Kutoka kwa Kuma kwa Kuuma kwa Ulaako Kowuok kuom ira
4 male Male Mwanaume Mundume Mundurume Mulume Dichuo Dir
5 not_provided Not provided Haijawekwa Inenganitwe Ndiikiritwo Kaphana Okoketi Kes inkan
6 no_language_list No language list Hamna lugha ya kuchagua Vai luka ya kusakwa Hatire ruthiomi rwakucagurwo Kahana luga irio orodeshwa Onge dhok miyiero Afaan chaguad injirt
7 no_transaction_history No transaction history Hamna ripoti ya matumizi Vai livoti ya utumii Hatire riboti ya mahuthira Kahana repoti ya mahumizi Onge ripot mar tiyo Odhuu jalkaban injirt
8 no_tokens_list No more Sarafu Hamna sarafu zingine Vai Sarafu ingi Hatire Sarafu inge Kahana Sarafu zaidi Onge Sarafu moko Sarafu dibii injirt
9 other Other Nyingine Ingi Inge Nyinjine Moko Ta dibii
10 received Received Ulipokea Niwakwatie Niuramukirire Hokera Niyudo Argat
11 sent Sent Ulituma Niwatumie Niuratumire Humwa Nioro Ergan
12 to To Kwa Kwa Hare Kwa Ne Es
13 guardians_list_header Your PIN guards are: PIN Walinzi uliowaongeza ni: PIN Atetheesya ala wongelile ni: Agiteri a PIN yaku ni: PIN Aimirizi urioika ni: PIN Jorit magi gin: PIN Naam at korkorad:
14 no_guardians_list No PIN guardians set Hamna PIN walinzi walioongezwa Vai atetheesya mongelwa Hartire agiteri meekeretwo Kahana aimirizi adzoikwa Onge jorit moketi Nam an korkorad injirt
15 error.no_phone_number_provided No phone number was provided Nambari ya simu haijawekwa Namba ya simu inaikiwa Namba ya thimu ndihianetwo Kahana namba ya simu idzopewa Namba mar simu okoketi Namba simu kees inkaan
16 error.no_matching_account The number provided is not registered Nambari uliyoweka haijasajiliwa Namba ya simu ila wekiya ti mbandikithye Namba iria wekera ndiandekithetwo Namba idzopewa kaidzagwe kusajiliwa Namba mar simu miketo pok ondiki Namba ka at kekeet sajiil incab
17 error.is_initiator Phone number cannot be your own Nambari yafaa kuwa tofauti na yako Namba ya simu yaile ithiwa itavwanene na yaku Namba ifatie gukorwo ina utiganu na yaku Namba yasimu kaidima kukala niyako Namba onego obed mopogre gimari Namba simu tete tau mal
18 error.is_existent_guardian This phone number is already added as a PIN guardian Nambari hii tayari imeongezwa kama mlinzi wa nambari ya siri Namba ii niyongeletwe tayari ta mutethesya wa kusovya pin Namba ino niyongereirwo ta murugamereri ya namba ya thiri Nambari ii yasimu yaikwa kare Muimirizi Nambani oseketi kaka jarit Namba tana yayu nam korkoradi taat
19 error.is_not_existent_guardian Phone number not set as PIN guardian Nambari hii haijaongezwa kama mlinzi wa nambari ya PIN Namba ii iyongeletwe ta mutethesya wa kusovya PIN Namba ino ndiongereirwo ta mugiteri wa PIN Nambari ii yasimu kaiikika kugaluza PIN zda mwimirizi Nambani pok omed kaka jarit Namba simu ta nam korkorad indharan

View File

@ -0,0 +1,9 @@
{
"en": "English",
"sw": "Kiswahili",
"kam": "Kamba",
"kik": "Kikiuyu",
"miji": "Mijikenda",
"luo": "Luo",
"bor": "Borana"
}

View File

@ -0,0 +1,7 @@
keys,en,sw,kam,kik,miji,luo,bor
account_successfully_created,You have been registered on Sarafu Network! To use dial *384*96# on Safaricom and *483*96# on other networks. For help %{support_phone},Umesajiliwa kwa Sarafu Network! Kutumia bonyeza *384*96# Safaricom ama *483*46# kwa utandao tofauti. Kwa Usaidizi %{support_phone},Niwayandikithya na Sarafu Network! Safaricom kuna namba ii *384*96# mitandao ingi *483*96#. Utethyo ungi kuna %{support_phone},Niweyandekithia kwe Sarafu Network! Kuhuthira hihinya *384*96# he Safaricom na *483*46# he mitambo ingi Uteithio %{support_phone},Usajiliwa Sarafu Network! kuhumira hopya *384*96# Saf *483*96# mtandao mnjine. Kuvizwa %{support_phone},Osendiki e Sarafu Network! Kidwatiyogo to dii *384*96# Safaricom kata *483*46# e netwak mamoko. Kuom kony %{support_phone},Yaayu sirejestan Sarafu Network! Kuches *384*96# Safaricom *483*46# Airtel
received_tokens,Successfully received %{amount} %{token_symbol} from %{tx_sender_information} %{timestamp} to %{tx_recipient_information} Balance %{balance} %{token_symbol},Umepokea %{amount} %{token_symbol} kutoka kwa %{tx_sender_information} %{timestamp} kuendea %{tx_recipient_information} Salio %{balance} %{token_symbol},Niwakwata %{amount} %{token_symbol} kuma %{tx_sender_information} %{timestamp} kuvikia %{tx_recipient_information} Mbalansi %{balance} %{token_symbol},Wamukira %{amount} %{token_symbol} kuuma kwa %{tx_sender_information} %{timestamp} to %{tx_recipient_information} Watigaria %{balance} %{token_symbol},Uphokera %{amount} %{token_symbol} kula %{tx_sender_information} %{timestamp} Kwenda %{tx_recipient_information}. Sazoro %{balance} %{token_symbol},Iyudo %{amount} %{token_symbol} kowuok kuom %{tx_sender_information} %{timestamp} odhi ne %{tx_recipient_information}. Dong mari en %{balance} %{token_symbol},Yaargat %{amount} %{token_symbol} ira %{tx_sender_information} %{timestamp} Es %{tx_recipient_information} Balansi %{balance} %{token_symbol}
sent_tokens,Successfully sent %{amount} %{token_symbol} to %{tx_recipient_information} %{timestamp} from %{tx_sender_information} Balance %{balance} %{token_symbol},Umetuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kutoka kwa %{tx_sender_information} Salio %{balance} %{token_symbol},Niwatuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kuma %{tx_sender_information} Mbalansi %{balance} %{token_symbol}.,Watuma %{amount} %{token_symbol} kwe %{tx_recipient_information} %{timestamp} kuuma %{tx_sender_information} Watigaria %{balance} %{token_symbol},Uhuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kula %{tx_sender_information} Sazoro %{balance} %{token_symbol},Ioro %{amount} %{token_symbol} ne %{tx_recipient_information} %{timestamp} kowuok kuom %{tx_sender_information}. Dong mari en %{balance} %{token_symbol},yaergat %{amount} %{token_symbol} Es %{tx_recipient_information} %{timestamp} ira %{tx_sender_information} Balansi hareetin %{balance} %{token_symbol}
terms,By using the service you agree to the terms and conditions at http://grassecon.org/tos,Kwa kutumia hii huduma umekubali sheria na masharti yafuatayo http://grassecon.org/tos,Kwa kutumia mutandao uu niwetikilana na miyao na masharti ma http://grassecon.org/tos,"Kuhuthira mitambo ino , niuraetekania na mawatho na mutaratara wa http://grassecon.org/tos","Kuhumira huduma,Ukubali sheria na malagizo http://grassecon.org/tos ",Kuom tiyo gi huduma ni iyie chike kod weche mantie http://grassecon.org/tos,Oja service tun tumiith yaayuu kubalt one chuf at http://grassecon.org/tos
upsell_unregistered_recipient,%{tx_sender_information} tried to send you %{token_symbol}. Dial *384*96# on Safaricom and *483*96# on others For help %{support_phone},%{tx_sender_information} amejaribu kutuma %{token_symbol} na hujasajili. Bonyeza*384*96# Saf au*483*46# kwa mitandao tofauti. Usaidizi %{support_phone},%{tx_sender_information} niwatata kuutumia %{token_symbol} lakini ndwimwandikithye. Safaricom kuna *384*96# laini ingi *483*96# Utethyo %{support_phone},%{tx_sender_information} ekugeretie gugutumira %{token_symbol} no ndeyandikithetie. Hihinya *384*96# he Safaricom na *483*96# mitambo ingi. Uteithio %{support_phone},%{tx_sender_information} Yuhuma %{token_symbol} Kudzasajiliwa. Humira hopya *384*96# Safaricom au *483*96# mtandao mnjine. Kuvizwa %{support_phone},%{tx_sender_information} otemo oro ni %{token_symbol} to pok ondiki. Tiyo go dii *384*96# Safaricom gi *483*96# e netwak mamoko. E kony %{support_phone},%{tx_sender_information} yaa si ergu jariib %{token_symbol} ammo atin insajilan.Tumiitu kuches *384*96# Safaricom *483*96# dibii Qarqars %{support_phone}
pin_reset_initiated,%{pin_initiator} has sent a request to initiate your PIN reset,%{pin_initiator} ametuma ombi la kubadilisha PIN yako,%{pin_initiator} niwatuma wendi waku wa kwambiisya kusovya PIN yaku,%{pin_initiator} Niatuma ihoya ria guchengia PIN yaku,%{pin_initiator} yuhuma voyo kurekebisha piniyo.,%{pin_initiator} ooro kwayo mar loko nambani mopondo,%{pin_initiator} pin Tate badilishadu feet
1 keys en sw kam kik miji luo bor
2 account_successfully_created You have been registered on Sarafu Network! To use dial *384*96# on Safaricom and *483*96# on other networks. For help %{support_phone} Umesajiliwa kwa Sarafu Network! Kutumia bonyeza *384*96# Safaricom ama *483*46# kwa utandao tofauti. Kwa Usaidizi %{support_phone} Niwayandikithya na Sarafu Network! Safaricom kuna namba ii *384*96# mitandao ingi *483*96#. Utethyo ungi kuna %{support_phone} Niweyandekithia kwe Sarafu Network! Kuhuthira hihinya *384*96# he Safaricom na *483*46# he mitambo ingi Uteithio %{support_phone} Usajiliwa Sarafu Network! kuhumira hopya *384*96# Saf *483*96# mtandao mnjine. Kuvizwa %{support_phone} Osendiki e Sarafu Network! Kidwatiyogo to dii *384*96# Safaricom kata *483*46# e netwak mamoko. Kuom kony %{support_phone} Yaayu sirejestan Sarafu Network! Kuches *384*96# Safaricom *483*46# Airtel
3 received_tokens Successfully received %{amount} %{token_symbol} from %{tx_sender_information} %{timestamp} to %{tx_recipient_information} Balance %{balance} %{token_symbol} Umepokea %{amount} %{token_symbol} kutoka kwa %{tx_sender_information} %{timestamp} kuendea %{tx_recipient_information} Salio %{balance} %{token_symbol} Niwakwata %{amount} %{token_symbol} kuma %{tx_sender_information} %{timestamp} kuvikia %{tx_recipient_information} Mbalansi %{balance} %{token_symbol} Wamukira %{amount} %{token_symbol} kuuma kwa %{tx_sender_information} %{timestamp} to %{tx_recipient_information} Watigaria %{balance} %{token_symbol} Uphokera %{amount} %{token_symbol} kula %{tx_sender_information} %{timestamp} Kwenda %{tx_recipient_information}. Sazoro %{balance} %{token_symbol} Iyudo %{amount} %{token_symbol} kowuok kuom %{tx_sender_information} %{timestamp} odhi ne %{tx_recipient_information}. Dong mari en %{balance} %{token_symbol} Yaargat %{amount} %{token_symbol} ira %{tx_sender_information} %{timestamp} Es %{tx_recipient_information} Balansi %{balance} %{token_symbol}
4 sent_tokens Successfully sent %{amount} %{token_symbol} to %{tx_recipient_information} %{timestamp} from %{tx_sender_information} Balance %{balance} %{token_symbol} Umetuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kutoka kwa %{tx_sender_information} Salio %{balance} %{token_symbol} Niwatuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kuma %{tx_sender_information} Mbalansi %{balance} %{token_symbol}. Watuma %{amount} %{token_symbol} kwe %{tx_recipient_information} %{timestamp} kuuma %{tx_sender_information} Watigaria %{balance} %{token_symbol} Uhuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kula %{tx_sender_information} Sazoro %{balance} %{token_symbol} Ioro %{amount} %{token_symbol} ne %{tx_recipient_information} %{timestamp} kowuok kuom %{tx_sender_information}. Dong mari en %{balance} %{token_symbol} yaergat %{amount} %{token_symbol} Es %{tx_recipient_information} %{timestamp} ira %{tx_sender_information} Balansi hareetin %{balance} %{token_symbol}
5 terms By using the service you agree to the terms and conditions at http://grassecon.org/tos Kwa kutumia hii huduma umekubali sheria na masharti yafuatayo http://grassecon.org/tos Kwa kutumia mutandao uu niwetikilana na miyao na masharti ma http://grassecon.org/tos Kuhuthira mitambo ino , niuraetekania na mawatho na mutaratara wa http://grassecon.org/tos Kuhumira huduma,Ukubali sheria na malagizo http://grassecon.org/tos Kuom tiyo gi huduma ni iyie chike kod weche mantie http://grassecon.org/tos Oja service tun tumiith yaayuu kubalt one chuf at http://grassecon.org/tos
6 upsell_unregistered_recipient %{tx_sender_information} tried to send you %{token_symbol}. Dial *384*96# on Safaricom and *483*96# on others For help %{support_phone} %{tx_sender_information} amejaribu kutuma %{token_symbol} na hujasajili. Bonyeza*384*96# Saf au*483*46# kwa mitandao tofauti. Usaidizi %{support_phone} %{tx_sender_information} niwatata kuutumia %{token_symbol} lakini ndwimwandikithye. Safaricom kuna *384*96# laini ingi *483*96# Utethyo %{support_phone} %{tx_sender_information} ekugeretie gugutumira %{token_symbol} no ndeyandikithetie. Hihinya *384*96# he Safaricom na *483*96# mitambo ingi. Uteithio %{support_phone} %{tx_sender_information} Yuhuma %{token_symbol} Kudzasajiliwa. Humira hopya *384*96# Safaricom au *483*96# mtandao mnjine. Kuvizwa %{support_phone} %{tx_sender_information} otemo oro ni %{token_symbol} to pok ondiki. Tiyo go dii *384*96# Safaricom gi *483*96# e netwak mamoko. E kony %{support_phone} %{tx_sender_information} yaa si ergu jariib %{token_symbol} ammo atin insajilan.Tumiitu kuches *384*96# Safaricom *483*96# dibii Qarqars %{support_phone}
7 pin_reset_initiated %{pin_initiator} has sent a request to initiate your PIN reset %{pin_initiator} ametuma ombi la kubadilisha PIN yako %{pin_initiator} niwatuma wendi waku wa kwambiisya kusovya PIN yaku %{pin_initiator} Niatuma ihoya ria guchengia PIN yaku %{pin_initiator} yuhuma voyo kurekebisha piniyo. %{pin_initiator} ooro kwayo mar loko nambani mopondo %{pin_initiator} pin Tate badilishadu feet

File diff suppressed because it is too large Load Diff

View File

@ -2,417 +2,441 @@
"ussd_menu": {
"1": {
"description": "Entry point for users to select their preferred language.",
"display_key": "ussd.kenya.initial_language_selection",
"display_key": "ussd.initial_language_selection",
"name": "initial_language_selection",
"parent": null
},
"2": {
"description": "Entry point for users to enter a pin to secure their account.",
"display_key": "ussd.kenya.initial_pin_entry",
"display_key": "ussd.initial_pin_entry",
"name": "initial_pin_entry",
"parent": null
},
"3": {
"description": "Pin confirmation entry menu.",
"display_key": "ussd.kenya.initial_pin_confirmation",
"display_key": "ussd.initial_pin_confirmation",
"name": "initial_pin_confirmation",
"parent": "initial_pin_entry"
},
"4": {
"description": "The signup process has been initiated and the account is being created.",
"display_key": "ussd.kenya.account_creation_prompt",
"display_key": "ussd.account_creation_prompt",
"name": "account_creation_prompt",
"parent": null
},
"5": {
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"display_key": "ussd.start",
"name": "start",
"parent": null
},
"6": {
"description": "Given name entry menu.",
"display_key": "ussd.kenya.enter_given_name",
"display_key": "ussd.enter_given_name",
"name": "enter_given_name",
"parent": "metadata_management"
},
"7": {
"description": "Family name entry menu.",
"display_key": "ussd.kenya.enter_family_name",
"display_key": "ussd.enter_family_name",
"name": "enter_family_name",
"parent": "metadata_management"
},
"8": {
"description": "Gender entry menu.",
"display_key": "ussd.kenya.enter_gender",
"display_key": "ussd.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"9": {
"description": "Age entry menu.",
"display_key": "ussd.kenya.enter_gender",
"display_key": "ussd.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"10": {
"description": "Location entry menu.",
"display_key": "ussd.kenya.enter_location",
"display_key": "ussd.enter_location",
"name": "enter_location",
"parent": "metadata_management"
},
"11": {
"description": "Products entry menu.",
"display_key": "ussd.kenya.enter_products",
"display_key": "ussd.enter_products",
"name": "enter_products",
"parent": "metadata_management"
},
"12": {
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"display_key": "ussd.start",
"name": "start",
"parent": null
},
"13": {
"description": "Send Token recipient entry.",
"display_key": "ussd.kenya.enter_transaction_recipient",
"display_key": "ussd.enter_transaction_recipient",
"name": "enter_transaction_recipient",
"parent": "start"
},
"14": {
"description": "Send Token amount prompt menu.",
"display_key": "ussd.kenya.enter_transaction_amount",
"display_key": "ussd.enter_transaction_amount",
"name": "enter_transaction_amount",
"parent": "start"
},
"15": {
"description": "Pin entry for authorization to send token.",
"display_key": "ussd.kenya.transaction_pin_authorization",
"display_key": "ussd.transaction_pin_authorization",
"name": "transaction_pin_authorization",
"parent": "start"
},
"16": {
"description": "Manage account menu.",
"display_key": "ussd.kenya.account_management",
"display_key": "ussd.account_management",
"name": "account_management",
"parent": "start"
},
"17": {
"description": "Manage metadata menu.",
"display_key": "ussd.kenya.metadata_management",
"display_key": "ussd.metadata_management",
"name": "metadata_management",
"parent": "start"
},
"18": {
"description": "Manage user's preferred language menu.",
"display_key": "ussd.kenya.select_preferred_language",
"display_key": "ussd.select_preferred_language",
"name": "select_preferred_language",
"parent": "account_management"
},
"19": {
"description": "Retrieve mini-statement menu.",
"display_key": "ussd.kenya.mini_statement_pin_authorization",
"display_key": "ussd.mini_statement_pin_authorization",
"name": "mini_statement_pin_authorization",
"parent": "account_management"
},
"20": {
"description": "Manage user's pin menu.",
"display_key": "ussd.kenya.enter_current_pin",
"display_key": "ussd.enter_current_pin",
"name": "enter_current_pin",
"parent": "account_management"
},
"21": {
"description": "New pin entry menu.",
"display_key": "ussd.kenya.enter_new_pin",
"display_key": "ussd.enter_new_pin",
"name": "enter_new_pin",
"parent": "account_management"
},
"22": {
"description": "Pin entry menu.",
"display_key": "ussd.kenya.display_metadata_pin_authorization",
"display_key": "ussd.display_metadata_pin_authorization",
"name": "display_metadata_pin_authorization",
"parent": "start"
},
"23": {
"description": "Exit menu.",
"display_key": "ussd.kenya.exit",
"display_key": "ussd.exit",
"name": "exit",
"parent": null
},
"24": {
"description": "Invalid menu option.",
"display_key": "ussd.kenya.exit_invalid_menu_option",
"display_key": "ussd.exit_invalid_menu_option",
"name": "exit_invalid_menu_option",
"parent": null
},
"25": {
"description": "Pin policy violation.",
"display_key": "ussd.kenya.exit_invalid_pin",
"display_key": "ussd.exit_invalid_pin",
"name": "exit_invalid_pin",
"parent": null
},
"26": {
"description": "Pin mismatch. New pin and the new pin confirmation do not match",
"display_key": "ussd.kenya.exit_pin_mismatch",
"display_key": "ussd.exit_pin_mismatch",
"name": "exit_pin_mismatch",
"parent": null
},
"27": {
"description": "Ussd pin blocked Menu",
"display_key": "ussd.kenya.exit_pin_blocked",
"display_key": "ussd.exit_pin_blocked",
"name": "exit_pin_blocked",
"parent": null
},
"28": {
"description": "Key params missing in request.",
"display_key": "ussd.kenya.exit_invalid_request",
"display_key": "ussd.exit_invalid_request",
"name": "exit_invalid_request",
"parent": null
},
"29": {
"description": "The user did not select a choice.",
"display_key": "ussd.kenya.exit_invalid_input",
"display_key": "ussd.exit_invalid_input",
"name": "exit_invalid_input",
"parent": null
},
"30": {
"description": "Exit following unsuccessful transaction due to insufficient account balance.",
"display_key": "ussd.kenya.exit_insufficient_balance",
"display_key": "ussd.exit_insufficient_balance",
"name": "exit_insufficient_balance",
"parent": null
},
"31": {
"description": "Exit following a successful transaction.",
"display_key": "ussd.kenya.exit_successful_transaction",
"display_key": "ussd.exit_successful_transaction",
"name": "exit_successful_transaction",
"parent": null
},
"32": {
"description": "End of a menu flow.",
"display_key": "ussd.kenya.complete",
"display_key": "ussd.complete",
"name": "complete",
"parent": null
},
"33": {
"description": "Pin entry menu to view account balances.",
"display_key": "ussd.kenya.account_balances_pin_authorization",
"display_key": "ussd.account_balances_pin_authorization",
"name": "account_balances_pin_authorization",
"parent": "account_management"
},
"34": {
"description": "Pin entry menu to view account statement.",
"display_key": "ussd.kenya.account_statement_pin_authorization",
"display_key": "ussd.account_statement_pin_authorization",
"name": "account_statement_pin_authorization",
"parent": "account_management"
},
"35": {
"description": "Menu to display account balances.",
"display_key": "ussd.kenya.account_balances",
"display_key": "ussd.account_balances",
"name": "account_balances",
"parent": "account_management"
},
"36": {
"description": "Menu to display first set of transactions in statement.",
"display_key": "ussd.kenya.first_transaction_set",
"display_key": "ussd.first_transaction_set",
"name": "first_transaction_set",
"parent": null
"parent": "account_management"
},
"37": {
"description": "Menu to display middle set of transactions in statement.",
"display_key": "ussd.kenya.middle_transaction_set",
"display_key": "ussd.middle_transaction_set",
"name": "middle_transaction_set",
"parent": null
},
"38": {
"description": "Menu to display last set of transactions in statement.",
"display_key": "ussd.kenya.last_transaction_set",
"display_key": "ussd.last_transaction_set",
"name": "last_transaction_set",
"parent": null
},
"39": {
"description": "Menu to instruct users to call the office.",
"display_key": "ussd.kenya.help",
"display_key": "ussd.help",
"name": "help",
"parent": null
},
"40": {
"description": "Menu to display a user's entire profile",
"display_key": "ussd.kenya.display_user_metadata",
"display_key": "ussd.display_user_metadata",
"name": "display_user_metadata",
"parent": "metadata_management"
},
"41": {
"description": "The recipient is not in the system",
"display_key": "ussd.kenya.exit_invalid_recipient",
"display_key": "ussd.exit_invalid_recipient",
"name": "exit_invalid_recipient",
"parent": null
},
"42": {
"description": "Pin entry menu for changing name data.",
"display_key": "ussd.kenya.name_edit_pin_authorization",
"display_key": "ussd.name_edit_pin_authorization",
"name": "name_edit_pin_authorization",
"parent": "metadata_management"
},
"43": {
"description": "Pin entry menu for changing gender data.",
"display_key": "ussd.kenya.gender_edit_pin_authorization",
"display_key": "ussd.gender_edit_pin_authorization",
"name": "gender_edit_pin_authorization",
"parent": "metadata_management"
},
"44": {
"description": "Pin entry menu for changing location data.",
"display_key": "ussd.kenya.location_edit_pin_authorization",
"display_key": "ussd.location_edit_pin_authorization",
"name": "location_edit_pin_authorization",
"parent": "metadata_management"
},
"45": {
"description": "Pin entry menu for changing products data.",
"display_key": "ussd.kenya.products_edit_pin_authorization",
"display_key": "ussd.products_edit_pin_authorization",
"name": "products_edit_pin_authorization",
"parent": "metadata_management"
},
"46": {
"description": "Pin confirmation for pin change.",
"display_key": "ussd.kenya.new_pin_confirmation",
"display_key": "ussd.new_pin_confirmation",
"name": "new_pin_confirmation",
"parent": "metadata_management"
},
"47": {
"description": "Year of birth entry menu.",
"display_key": "ussd.kenya.enter_date_of_birth",
"display_key": "ussd.enter_date_of_birth",
"name": "enter_date_of_birth",
"parent": "metadata_management"
},
"48": {
"description": "Pin entry menu for changing year of birth data.",
"display_key": "ussd.kenya.dob_edit_pin_authorization",
"display_key": "ussd.dob_edit_pin_authorization",
"name": "dob_edit_pin_authorization",
"parent": "metadata_management"
},
"49": {
"description": "Menu to display first set of tokens in the account's token list.",
"display_key": "ussd.kenya.first_account_tokens_set",
"display_key": "ussd.first_account_tokens_set",
"name": "first_account_tokens_set",
"parent": null
"parent": "start"
},
"50": {
"description": "Menu to display middle set of tokens in the account's token list.",
"display_key": "ussd.kenya.middle_account_tokens_set",
"display_key": "ussd.middle_account_tokens_set",
"name": "middle_account_tokens_set",
"parent": null
},
"51": {
"description": "Menu to display last set of tokens in the account's token list.",
"display_key": "ussd.kenya.last_account_tokens_set",
"display_key": "ussd.last_account_tokens_set",
"name": "last_account_tokens_set",
"parent": null
},
"52": {
"description": "Pin entry menu for setting an active token.",
"display_key": "ussd.kenya.token_selection_pin_authorization",
"display_key": "ussd.token_selection_pin_authorization",
"name": "token_selection_pin_authorization",
"parent": null
"parent": "first_account_tokens_set"
},
"53": {
"description": "Exit following a successful active token setting.",
"display_key": "ussd.kenya.exit_successful_token_selection",
"display_key": "ussd.exit_successful_token_selection",
"name": "exit_successful_token_selection",
"parent": null
},
"54": {
"description": "Pin management menu for operations related to an account's pin.",
"display_key": "ussd.kenya.pin_management",
"display_key": "ussd.pin_management",
"name": "pin_management",
"parent": "start"
},
"55": {
"description": "Phone number entry for account whose pin is being reset.",
"display_key": "ussd.kenya.reset_guarded_pin",
"display_key": "ussd.reset_guarded_pin",
"name": "reset_guarded_pin",
"parent": "pin_management"
},
"56": {
"description": "Pin entry for initiating request to reset an account's pin.",
"display_key": "ussd.kenya.reset_guarded_pin_authorization",
"display_key": "ussd.reset_guarded_pin_authorization",
"name": "reset_guarded_pin_authorization",
"parent": "pin_management"
},
"57": {
"description": "Exit menu following successful pin reset initiation.",
"display_key": "ussd.kenya.exit_pin_reset_initiated_success",
"display_key": "ussd.exit_pin_reset_initiated_success",
"name": "exit_pin_reset_initiated_success",
"parent": "pin_management"
},
"58": {
"description": "Exit menu in the event that an account is not a set guardian.",
"display_key": "ussd.kenya.exit_not_authorized_for_pin_reset",
"display_key": "ussd.exit_not_authorized_for_pin_reset",
"name": "exit_not_authorized_for_pin_reset",
"parent": "pin_management"
},
"59": {
"description": "Pin guard menu for handling guardianship operations.",
"display_key": "ussd.kenya.guard_pin",
"display_key": "ussd.guard_pin",
"name": "guard_pin",
"parent": "pin_management"
},
"60": {
"description": "Pin entry to display a list of set guardians.",
"display_key": "ussd.kenya.guardian_list_pin_authorization",
"display_key": "ussd.guardian_list_pin_authorization",
"name": "guardian_list_pin_authorization",
"parent": "guard_pin"
},
"61": {
"description": "Menu to display list of set guardians.",
"display_key": "ussd.kenya.guardian_list",
"display_key": "ussd.guardian_list",
"name": "guardian_list",
"parent": "guard_pin"
},
"62": {
"description": "Phone number entry to add an account as a guardian to reset pin.",
"display_key": "ussd.kenya.add_guardian",
"display_key": "ussd.add_guardian",
"name": "add_guardian",
"parent": "guard_pin"
},
"63": {
"description": "Pin entry to confirm addition of an account as a guardian.",
"display_key": "ussd.kenya.add_guardian_pin_authorization",
"display_key": "ussd.add_guardian_pin_authorization",
"name": "add_guardian_pin_authorization",
"parent": "guard_pin"
},
"64": {
"description": "Exit menu when an account is successfully added as pin reset guardian.",
"display_key": "ussd.kenya.exit_guardian_addition_success",
"display_key": "ussd.exit_guardian_addition_success",
"name": "exit_guardian_addition_success",
"parent": "guard_pin"
},
"65": {
"description": "Phone number entry to remove an account as a guardian to reset pin.",
"display_key": "ussd.kenya.remove_guardian",
"display_key": "ussd.remove_guardian",
"name": "remove_guardian",
"parent": "guard_pin"
},
"66": {
"description": "Pin entry to confirm removal of an account as a guardian.",
"display_key": "ussd.kenya.remove_guardian_pin_authorization",
"display_key": "ussd.remove_guardian_pin_authorization",
"name": "remove_guardian_pin_authorization",
"parent": "guard_pin"
},
"67": {
"description": "Exit menu when an account is successfully removed as pin reset guardian.",
"display_key": "ussd.kenya.exit_guardian_removal_success",
"display_key": "ussd.exit_guardian_removal_success",
"name": "exit_guardian_removal_success",
"parent": "guard_pin"
},
"68": {
"description": "Exit menu when invalid phone number entry for guardian addition. ",
"display_key": "ussd.kenya.exit_invalid_guardian_addition",
"description": "Exit menu when invalid phone number entry for guardian addition.",
"display_key": "ussd.exit_invalid_guardian_addition",
"name": "exit_invalid_guardian_addition",
"parent": "guard_pin"
},
"69": {
"description": "Exit menu when invalid phone number entry for guardian removal. ",
"display_key": "ussd.kenya.exit_invalid_guardian_removal",
"description": "Exit menu when invalid phone number entry for guardian removal.",
"display_key": "ussd.exit_invalid_guardian_removal",
"name": "exit_invalid_guardian_removal",
"parent": "guard_pin"
},
"70": {
"description": "Menu to display middle set of languages to select.",
"display_key": "ussd.initial_middle_language_set",
"name": "initial_middle_language_set",
"parent": null
},
"71": {
"description": "Menu to display last set of languages to select.",
"display_key": "ussd.initial_last_language_set",
"name": "initial_last_language_set",
"parent": null
},
"72": {
"description": "Menu to display middle set of languages to select.",
"display_key": "ussd.middle_language_set",
"name": "middle_language_set",
"parent": null
},
"73": {
"description": "Menu to display last set of languages to select.",
"display_key": "ussd.last_language_set",
"name": "last_language_set",
"parent": null
}
}
}

View File

@ -63,10 +63,7 @@ class Account(SessionBase):
def remove_guardian(self, phone_number: str):
set_guardians = self.guardians.split(',')
set_guardians.remove(phone_number)
if len(set_guardians) > 1:
self.guardians = ','.join(set_guardians)
else:
self.guardians = set_guardians[0]
self.guardians = ','.join(set_guardians)
def get_guardians(self) -> list:
return self.guardians.split(',') if self.guardians else []
@ -171,7 +168,7 @@ class Account(SessionBase):
return check_password_hash(password, self.password_hash)
def create(chain_str: str, phone_number: str, session: Session):
def create(chain_str: str, phone_number: str, session: Session, preferred_language: str):
"""
:param chain_str:
:type chain_str:
@ -179,12 +176,14 @@ def create(chain_str: str, phone_number: str, session: Session):
:type phone_number:
:param session:
:type session:
:param preferred_language:
:type preferred_language:
:return:
:rtype:
"""
api = Api(callback_task='cic_ussd.tasks.callback_handler.account_creation_callback',
callback_queue='cic-ussd',
callback_param='',
callback_param=preferred_language,
chain_str=chain_str)
task_uuid = api.create_account().id
TaskTracker.add(session=session, task_uuid=task_uuid)

View File

@ -52,4 +52,5 @@ class UnknownUssdRecipient(Exception):
"""Raised when a recipient of a transaction is not known to the ussd application."""
class MaxRetryReached(Exception):
"""Raised when the maximum number of retries defined for polling for the availability of a resource."""

View File

@ -7,3 +7,4 @@ from .custom import CustomMetadata
from .person import PersonMetadata
from .phone import PhonePointerMetadata
from .preferences import PreferencesMetadata
from .tokens import TokenMetadata

View File

@ -19,34 +19,34 @@ from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import (
get_cached_statement,
parse_statement_transactions,
query_statement,
statement_transaction_set
)
query_statement)
from cic_ussd.account.tokens import (create_account_tokens_list,
get_active_token_symbol,
get_cached_token_data,
get_cached_token_symbol_list,
get_cached_token_data_list,
parse_token_list,
token_list_set)
parse_token_list)
from cic_ussd.account.transaction import from_wei, to_wei
from cic_ussd.cache import cache_data_key, cache_data
from cic_ussd.cache import cache_data_key, cache_data, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import PersonMetadata
from cic_ussd.phone_number import Support
from cic_ussd.processor.util import parse_person_metadata
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
from cic_ussd.translation import translation_for
from sqlalchemy.orm.session import Session
logg = logging.getLogger(__name__)
logg = logging.getLogger(__file__)
class MenuProcessor:
def __init__(self, account: Account, display_key: str, menu_name: str, session: Session, ussd_session: dict):
self.account = account
self.display_key = display_key
self.identifier = bytes.fromhex(self.account.blockchain_address)
if account:
self.identifier = bytes.fromhex(self.account.blockchain_address)
self.menu_name = menu_name
self.session = session
self.ussd_session = ussd_session
@ -89,36 +89,29 @@ class MenuProcessor:
:rtype:
"""
cached_statement = get_cached_statement(self.account.blockchain_address)
transaction_sets = []
if cached_statement:
statement = json.loads(cached_statement)
statement_transactions = parse_statement_transactions(statement)
transaction_sets = [statement_transactions[tx:tx + 3] for tx in range(0, len(statement_transactions), 3)]
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
no_transaction_history = statement_transaction_set(preferred_language, transaction_sets)
first_transaction_set = no_transaction_history
middle_transaction_set = no_transaction_history
last_transaction_set = no_transaction_history
if transaction_sets:
first_transaction_set = statement_transaction_set(preferred_language, transaction_sets[0])
if len(transaction_sets) >= 2:
middle_transaction_set = statement_transaction_set(preferred_language, transaction_sets[1])
if len(transaction_sets) >= 3:
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
if self.display_key == 'ussd.kenya.first_transaction_set':
statement_list = []
if cached_statement:
statement_list = parse_statement_transactions(statement=json.loads(cached_statement))
fallback = translation_for('helpers.no_transaction_history', preferred_language)
transaction_sets = ussd_menu_list(fallback=fallback, menu_list=statement_list, split=3)
if self.display_key == 'ussd.first_transaction_set':
return translation_for(
self.display_key, preferred_language, first_transaction_set=first_transaction_set
self.display_key, preferred_language, first_transaction_set=transaction_sets[0]
)
if self.display_key == 'ussd.kenya.middle_transaction_set':
if self.display_key == 'ussd.middle_transaction_set':
return translation_for(
self.display_key, preferred_language, middle_transaction_set=middle_transaction_set
self.display_key, preferred_language, middle_transaction_set=transaction_sets[1]
)
if self.display_key == 'ussd.kenya.last_transaction_set':
if self.display_key == 'ussd.last_transaction_set':
return translation_for(
self.display_key, preferred_language, last_transaction_set=last_transaction_set
self.display_key, preferred_language, last_transaction_set=transaction_sets[2]
)
def add_guardian_pin_authorization(self):
@ -129,7 +122,7 @@ class MenuProcessor:
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
set_guardians = self.account.get_guardians()
set_guardians = self.account.get_guardians()[:3]
if set_guardians:
guardians_list = ''
guardians_list_header = translation_for('helpers.guardians_list_header', preferred_language)
@ -145,36 +138,30 @@ class MenuProcessor:
def account_tokens(self) -> str:
cached_token_data_list = get_cached_token_data_list(self.account.blockchain_address)
token_data_list = parse_token_list(cached_token_data_list)
token_list_sets = [token_data_list[tds:tds + 3] for tds in range(0, len(token_data_list), 3)]
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
no_token_list = token_list_set(preferred_language, [])
first_account_tokens_set = no_token_list
middle_account_tokens_set = no_token_list
last_account_tokens_set = no_token_list
if token_list_sets:
data = {
'account_tokens_list': cached_token_data_list
}
save_session_data(data=data, queue='cic-ussd', session=self.session, ussd_session=self.ussd_session)
first_account_tokens_set = token_list_set(preferred_language, token_list_sets[0])
if len(token_list_sets) >= 2:
middle_account_tokens_set = token_list_set(preferred_language, token_list_sets[1])
if len(token_list_sets) >= 3:
last_account_tokens_set = token_list_set(preferred_language, token_list_sets[2])
if self.display_key == 'ussd.kenya.first_account_tokens_set':
fallback = translation_for('helpers.no_tokens_list', preferred_language)
token_list_sets = ussd_menu_list(fallback=fallback, menu_list=token_data_list, split=3)
data = {
'account_tokens_list': cached_token_data_list
}
save_session_data(data=data, queue='cic-ussd', session=self.session, ussd_session=self.ussd_session)
if self.display_key == 'ussd.first_account_tokens_set':
return translation_for(
self.display_key, preferred_language, first_account_tokens_set=first_account_tokens_set
self.display_key, preferred_language, first_account_tokens_set=token_list_sets[0]
)
if self.display_key == 'ussd.kenya.middle_account_tokens_set':
if self.display_key == 'ussd.middle_account_tokens_set':
return translation_for(
self.display_key, preferred_language, middle_account_tokens_set=middle_account_tokens_set
self.display_key, preferred_language, middle_account_tokens_set=token_list_sets[1]
)
if self.display_key == 'ussd.kenya.last_account_tokens_set':
if self.display_key == 'ussd.last_account_tokens_set':
return translation_for(
self.display_key, preferred_language, last_account_tokens_set=last_account_tokens_set
self.display_key, preferred_language, last_account_tokens_set=token_list_sets[2]
)
def help(self) -> str:
@ -222,7 +209,7 @@ class MenuProcessor:
remaining_attempts = 3
remaining_attempts -= self.account.failed_pin_attempts
retry_pin_entry = translation_for(
'ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=remaining_attempts
'ussd.retry_pin_entry', preferred_language, remaining_attempts=remaining_attempts
)
return translation_for(
f'{self.display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry
@ -238,6 +225,38 @@ class MenuProcessor:
guardian = Account.get_by_phone_number(guardian_phone_number, self.session)
return guardian.standard_metadata_id()
def language(self):
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list: list = json.loads(cached_system_languages)
if self.account:
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
else:
preferred_language = i18n.config.get('fallback')
fallback = translation_for('helpers.no_language_list', preferred_language)
language_list_sets = ussd_menu_list(fallback=fallback, menu_list=language_list, split=3)
if self.display_key in ['ussd.initial_language_selection', 'ussd.select_preferred_language']:
return translation_for(
self.display_key, preferred_language, first_language_set=language_list_sets[0]
)
if 'middle_language_set' in self.display_key:
return translation_for(
self.display_key, preferred_language, middle_language_set=language_list_sets[1]
)
if 'last_language_set' in self.display_key:
return translation_for(
self.display_key, preferred_language, last_language_set=language_list_sets[2]
)
def account_creation_prompt(self):
preferred_language = preferred_langauge_from_selection(self.ussd_session.get('user_input'))
return translation_for(self.display_key, preferred_language)
def reset_guarded_pin_authorization(self):
guarded_account_information = self.guarded_account_metadata()
return self.pin_authorization(guarded_account_information=guarded_account_information)
@ -381,8 +400,9 @@ class MenuProcessor:
)
def exit_invalid_menu_option(self):
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
if self.account:
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
else:
preferred_language = i18n.config.get('fallback')
return translation_for(self.display_key, preferred_language, support_phone=Support.phone_number)
@ -390,7 +410,7 @@ class MenuProcessor:
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for('ussd.kenya.exit_pin_blocked', preferred_language, support_phone=Support.phone_number)
return translation_for('ussd.exit_pin_blocked', preferred_language, support_phone=Support.phone_number)
def exit_successful_token_selection(self) -> str:
selected_token = self.ussd_session.get('data').get('selected_token')
@ -398,7 +418,7 @@ class MenuProcessor:
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for(self.display_key,preferred_language,token_symbol=token_symbol)
return translation_for(self.display_key, preferred_language, token_symbol=token_symbol)
def exit_successful_transaction(self):
"""
@ -445,6 +465,9 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
"""
menu_processor = MenuProcessor(account, display_key, menu_name, session, ussd_session)
if menu_name == 'account_creation_prompt':
return menu_processor.account_creation_prompt()
if menu_name == 'start':
return menu_processor.start_menu()
@ -502,6 +525,9 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
if 'account_tokens_set' in menu_name:
return menu_processor.account_tokens()
if 'language' in menu_name:
return menu_processor.language()
if menu_name == 'display_user_metadata':
return menu_processor.person_metadata()
@ -515,5 +541,4 @@ def response(account: Account, display_key: str, menu_name: str, session: Sessio
return menu_processor.exit_successful_token_selection()
preferred_language = get_cached_preferred_language(account.blockchain_address)
return translation_for(display_key, preferred_language)

View File

@ -0,0 +1,104 @@
# standard imports
import logging
import time
from queue import Queue
from typing import Callable, Dict, Optional, Tuple, Union
# external imports
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import MaxRetryReached
logg = logging.getLogger()
# adapted from https://github.com/justiniso/polling/blob/master/polling.py
# opted not to use the package to reduce dependency
def poller(args: Optional[Tuple],
interval: int,
kwargs: Optional[Dict],
max_retry: int,
target: Callable[..., Union[Dict, str]]):
""""""
collected_values: list = []
expected_value = None
tries = 0
while True:
if tries >= max_retry:
raise MaxRetryReached(collected_values, expected_value)
try:
if args:
value = target(*args)
elif kwargs:
value = target(**kwargs)
else:
value = target()
expected_value = value
except () as error:
expected_value = error
else:
if bool(value) or value == {}:
logg.debug(f'Resource: {expected_value} now available.')
break
collected_values.append(expected_value)
logg.debug(f'Collected values are: {collected_values}')
tries += 1
time.sleep(interval)
def wait_for_cache(identifier: Union[list, bytes],
resource_name: str,
salt: MetadataPointer,
interval: int = 1,
max_retry: int = 5):
"""
:param identifier:
:type identifier:
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param salt:
:type salt:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
key: str = cache_data_key(identifier=identifier, salt=salt)
logg.debug(f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.')
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
def wait_for_session_data(resource_name: str,
session_data_key: str,
ussd_session: dict,
interval: int = 1,
max_retry: int = 5):
"""
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param session_data_key:
:type session_data_key:
:param ussd_session:
:type ussd_session:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
# poll for data element first
logg.debug(f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.')
poller(args=('data',), interval=interval, kwargs=None, max_retry=max_retry, target=ussd_session.get)
# poll for session data element
get_session_data = ussd_session.get('data').get
logg.debug(f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.')
poller(args=(session_data_key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_session_data)

View File

@ -8,7 +8,7 @@ from sqlalchemy.orm.session import Session
from tinydb.table import Document
# local imports
from cic_ussd.db.models.account import Account, create
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.menu.ussd_menu import UssdMenu
@ -16,7 +16,6 @@ from cic_ussd.processor.menu import response
from cic_ussd.processor.util import latest_input, resume_last_ussd_session
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.translation import translation_for
from cic_ussd.validator import is_valid_response
@ -36,9 +35,6 @@ def handle_menu(account: Account, session: Session) -> Document:
last_ussd_session = UssdSession.last_ussd_session(account.phone_number, session)
if last_ussd_session:
return resume_last_ussd_session(last_ussd_session.state)
elif not account.has_preferred_language():
return UssdMenu.find_by_name('initial_language_selection')
else:
return UssdMenu.find_by_name('initial_pin_entry')
@ -71,16 +67,13 @@ def get_menu(account: Account,
return UssdMenu.find_by_name(state)
def handle_menu_operations(chain_str: str,
external_session_id: str,
def handle_menu_operations(external_session_id: str,
phone_number: str,
queue: str,
service_code: str,
session,
user_input: str):
"""
:param chain_str:
:type chain_str:
:param external_session_id:
:type external_session_id:
:param phone_number:
@ -100,10 +93,38 @@ def handle_menu_operations(chain_str: str,
account: Account = Account.get_by_phone_number(phone_number, session)
if account:
return handle_account_menu_operations(account, external_session_id, queue, session, service_code, user_input)
create(chain_str, phone_number, session)
menu = UssdMenu.find_by_name('account_creation_prompt')
preferred_language = i18n.config.get('fallback')
create_or_update_session(
else:
return handle_no_account_menu_operations(
account, external_session_id, phone_number, queue, session, service_code, user_input)
def handle_no_account_menu_operations(account: Optional[Account],
external_session_id: str,
phone_number: str,
queue: str,
session: Session,
service_code: str,
user_input: str):
"""
:param account:
:type account:
:param external_session_id:
:type external_session_id:
:param phone_number:
:type phone_number:
:param queue:
:type queue:
:param session:
:type session:
:param service_code:
:type service_code:
:param user_input:
:type user_input:
:return:
:rtype:
"""
menu = UssdMenu.find_by_name('initial_language_selection')
ussd_session = create_or_update_session(
external_session_id=external_session_id,
msisdn=phone_number,
service_code=service_code,
@ -111,7 +132,20 @@ def handle_menu_operations(chain_str: str,
session=session,
user_input=user_input)
persist_ussd_session(external_session_id, queue)
return translation_for('ussd.kenya.account_creation_prompt', preferred_language)
last_ussd_session: UssdSession = UssdSession.last_ussd_session(phone_number, session)
if last_ussd_session:
if not user_input:
menu = resume_last_ussd_session(last_ussd_session.state)
else:
session = SessionBase.bind_session(session)
state = next_state(account, session, user_input, last_ussd_session.to_json())
menu = UssdMenu.find_by_name(state)
return response(account=account,
display_key=menu.get('display_key'),
menu_name=menu.get('name'),
session=session,
ussd_session=ussd_session.to_json())
def handle_account_menu_operations(account: Account,
@ -152,15 +186,12 @@ def handle_account_menu_operations(account: Account,
if last_ussd_session:
ussd_session = create_or_update_session(
external_session_id, phone_number, service_code, user_input, menu.get('name'), session,
last_ussd_session.data
)
last_ussd_session.data)
else:
ussd_session = create_or_update_session(
external_session_id, phone_number, service_code, user_input, menu.get('name'), session, None
)
external_session_id, phone_number, service_code, user_input, menu.get('name'), session, {})
menu_response = response(
account, menu.get('display_key'), menu.get('name'), session, ussd_session.to_json()
)
account, menu.get('display_key'), menu.get('name'), session, ussd_session.to_json())
if not is_valid_response(menu_response):
raise ValueError(f'Invalid response: {response}')
persist_ussd_session(external_session_id, queue)

View File

@ -3,7 +3,7 @@ import datetime
import json
import logging
import time
from typing import Union
from typing import List, Union
# external imports
from cic_types.condiments import MetadataPointer
@ -21,9 +21,7 @@ logg = logging.getLogger(__file__)
def latest_input(user_input: str) -> str:
"""
:param user_input:
:type user_input:
:return:
:rtype:
"""
return user_input.split('*')[-1]
@ -85,64 +83,22 @@ def resume_last_ussd_session(last_state: str) -> Document:
return UssdMenu.find_by_name(last_state)
def wait_for_cache(identifier: Union[list, bytes], resource_name: str, salt: MetadataPointer, interval: int = 1, max_retry: int = 5):
def ussd_menu_list(fallback: str, menu_list: list, split: int = 3) -> List[str]:
"""
:param identifier:
:type identifier:
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param salt:
:type salt:
:param max_retry:
:type max_retry:
:param fallback:
:type fallback:
:param menu_list:
:type menu_list:
:param split:
:type split:
:return:
:rtype:
"""
key = cache_data_key(identifier=identifier, salt=salt)
resource = get_cached_data(key)
counter = 0
while resource is None:
logg.debug(f'Waiting for: {resource_name} at: {key}. Checking after: {interval} ...')
time.sleep(interval)
counter += 1
resource = get_cached_data(key)
if resource is not None:
logg.debug(f'{resource_name} now available.')
break
else:
if counter == max_retry:
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
break
def wait_for_session_data(resource_name: str, session_data_key: str, ussd_session: dict, interval: int = 1, max_retry: int = 5):
"""
:param interval:
:type interval:
:param resource_name:
:type resource_name:
:param session_data_key:
:type session_data_key:
:param ussd_session:
:type ussd_session:
:param max_retry:
:type max_retry:
:return:
:rtype:
"""
session_data = ussd_session.get('data').get(session_data_key)
counter = 0
while session_data is None:
logg.debug(f'Waiting for: {resource_name}. Checking after: {interval} ...')
time.sleep(interval)
counter += 1
session_data = ussd_session.get('data').get(session_data_key)
if session_data is not None:
logg.debug(f'{resource_name} now available.')
break
else:
if counter == max_retry:
logg.debug(f'Could not find: {resource_name} within: {max_retry}')
break
menu_list_sets = [menu_list[item:item + split] for item in range(0, len(menu_list), split)]
menu_list_reprs = []
for i in range(split):
try:
menu_list_reprs.append(''.join(f'{list_set_item}\n' for list_set_item in menu_list_sets[i]).rstrip('\n'))
except IndexError:
menu_list_reprs.append(fallback)
return menu_list_reprs

View File

@ -20,6 +20,7 @@ from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.phone_number import Support
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.translation import generate_locale_files
from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
@ -83,6 +84,10 @@ if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
generate_locale_files(locale_dir=config.get('LOCALE_PATH'),
schema_file_path=config.get('SCHEMA_FILE_PATH'),
translation_builder_path=config.get('LOCALE_FILE_BUILDERS'))
# set up translations
i18n.load_path.append(config.get('LOCALE_PATH'))
i18n.set('fallback', config.get('LOCALE_FALLBACK'))

View File

@ -18,6 +18,7 @@ from cic_types.ext.metadata.signer import Signer
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.guardianship import Guardianship
from cic_ussd.account.tokens import query_default_token
from cic_ussd.cache import cache_data, cache_data_key, Cache
from cic_ussd.db import dsn_from_config
@ -33,7 +34,7 @@ from cic_ussd.processor.ussd import handle_menu_operations
from cic_ussd.runnable.server_base import exportable_parser, logg
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.translation import translation_for
from cic_ussd.translation import generate_locale_files, Languages, translation_for
from cic_ussd.validator import check_ip, check_request_content_length, validate_phone_number, validate_presence
args = exportable_parser.parse_args()
@ -56,10 +57,6 @@ SessionBase.connect(data_source_name,
pool_size=int(config.get('DATABASE_POOL_SIZE')),
debug=config.true('DATABASE_DEBUG'))
# set up translations
i18n.load_path.append(config.get('LOCALE_PATH'))
i18n.set('fallback', config.get('LOCALE_FALLBACK'))
# set Fernet key
PasswordEncoder.set_key(config.get('APP_PASSWORD_PEPPER'))
@ -121,6 +118,22 @@ valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
E164Format.region = config.get('E164_REGION')
Support.phone_number = config.get('OFFICE_SUPPORT_PHONE')
validate_presence(config.get('SYSTEM_GUARDIANS_FILE'))
Guardianship.load_system_guardians(config.get('SYSTEM_GUARDIANS_FILE'))
generate_locale_files(locale_dir=config.get('LOCALE_PATH'),
schema_file_path=config.get('SCHEMA_FILE_PATH'),
translation_builder_path=config.get('LOCALE_FILE_BUILDERS'))
# set up translations
i18n.load_path.append(config.get('LOCALE_PATH'))
i18n.set('fallback', config.get('LOCALE_FALLBACK'))
validate_presence(config.get('LANGUAGES_FILE'))
Languages.load_languages_dict(config.get('LANGUAGES_FILE'))
languages = Languages()
languages.cache_system_languages()
def application(env, start_response):
"""Loads python code for application to be accessible over web server
@ -175,7 +188,7 @@ def application(env, start_response):
if service_code not in valid_service_codes:
response = translation_for(
'ussd.kenya.invalid_service_code',
'ussd.invalid_service_code',
i18n.config.get('fallback'),
valid_service_code=valid_service_codes[0]
)
@ -189,9 +202,7 @@ def application(env, start_response):
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
response = handle_menu_operations(
chain_str, external_session_id, phone_number, args.q, service_code, session, user_input
)
response = handle_menu_operations(external_session_id, phone_number, args.q, service_code, session, user_input)
response_bytes, headers = with_content_headers(headers, response)
start_response('200 OK,', headers)
session.commit()

View File

@ -11,46 +11,20 @@ from cic_types.models.person import get_contact_data_from_vcard, generate_vcard_
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.maps import gender, language
from cic_ussd.account.maps import gender
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.account import Account, create
from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.metadata import PersonMetadata
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
from cic_ussd.translation import translation_for
from sqlalchemy.orm.session import Session
logg = logging.getLogger(__file__)
def change_preferred_language(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
r_user_input = language().get(user_input)
session = SessionBase.bind_session(session)
account.preferred_language = r_user_input
session.add(account)
session.flush()
SessionBase.release_session(session)
preferences_data = {
'preferred_language': r_user_input
}
s = celery.signature(
'cic_ussd.tasks.metadata.add_preferences_metadata',
[account.blockchain_address, preferences_data],
queue='cic-ussd'
)
return s.apply_async()
def update_account_status_to_active(state_machine_data: Tuple[str, dict, Account, Session]):
"""This function sets user's account to active.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@ -245,3 +219,16 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account, S
[blockchain_address, parsed_person_metadata]
)
s_edit_person_metadata.apply_async(queue='cic-ussd')
def process_account_creation(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
preferred_language = preferred_langauge_from_selection(user_input=user_input)
chain_str = Chain.spec.__str__()
create(chain_str, ussd_session.get('msisdn'), session, preferred_language)

View File

@ -0,0 +1,95 @@
# standard imports
import json
from typing import Tuple
# external imports
import celery
import i18n
from cic_types.condiments import MetadataPointer
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.processor.poller import wait_for_cache, wait_for_session_data
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.translation import Languages
def is_valid_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list = json.loads(cached_system_languages)
if not language_list:
wait_for_cache(identifier='system:languages'.encode('utf-8'), resource_name='Languages list', salt=MetadataPointer.NONE)
if user_input in ['00', '11', '22']:
return False
user_input = int(user_input)
return user_input <= len(language_list)
def change_preferred_language(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
process_language_selection(state_machine_data=state_machine_data)
user_input, ussd_session, account, session = state_machine_data
wait_for_session_data(resource_name='Preferred language', session_data_key='preferred_language', ussd_session=ussd_session)
preferred_language = ussd_session.get('data').get('preferred_language')
preferences_data = {
'preferred_language': preferred_language
}
s = celery.signature(
'cic_ussd.tasks.metadata.add_preferences_metadata',
[account.blockchain_address, preferences_data],
queue='cic-ussd'
)
return s.apply_async()
def process_language_selection(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
preferred_language = preferred_langauge_from_selection(user_input=user_input)
data = {
'preferred_language': preferred_language
}
save_session_data(queue='cic-ussd', session=session, data=data, ussd_session=ussd_session)
def preferred_langauge_from_selection(user_input: str):
"""
:param user_input:
:type user_input:
:return:
:rtype:
"""
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list = json.loads(cached_system_languages)
user_input = int(user_input)
selected_language = language_list[user_input - 1]
preferred_language = i18n.config.get('fallback')
for key, value in Languages.languages_dict.items():
if selected_language[3:] == value:
preferred_language = key
return preferred_language

View File

@ -15,7 +15,7 @@ from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.enum import AccountStatus
from cic_ussd.encoder import create_password_hash, check_password_hash
from cic_ussd.processor.util import wait_for_session_data
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session

View File

@ -9,9 +9,11 @@ from phonenumbers.phonenumberutil import NumberParseException
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.account.guardianship import Guardianship
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.notifications import Notifier
from cic_ussd.phone_number import process_phone_number, E164Format
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.translation import translation_for
@ -82,8 +84,10 @@ def is_valid_guardian_addition(state_machine_data: Tuple[str, dict, Account, Ses
preferred_language = i18n.config.get('fallback')
is_valid_account = Account.get_by_phone_number(phone_number, session) is not None
guardianship = Guardianship()
is_system_guardian = guardianship.is_system_guardian(phone_number)
is_initiator = phone_number == account.phone_number
is_existent_guardian = phone_number in account.get_guardians()
is_existent_guardian = phone_number in account.get_guardians() or is_system_guardian
failure_reason = ''
if not is_valid_account:
@ -100,7 +104,7 @@ def is_valid_guardian_addition(state_machine_data: Tuple[str, dict, Account, Ses
session_data['failure_reason'] = failure_reason
save_session_data('cic-ussd', session, session_data, ussd_session)
return phone_number is not None and is_valid_account and not is_existent_guardian and not is_initiator
return phone_number is not None and is_valid_account and not is_existent_guardian and not is_initiator and not is_system_guardian
def add_pin_guardian(state_machine_data: Tuple[str, dict, Account, Session]):
@ -130,6 +134,9 @@ def is_set_pin_guardian(account: Account, checked_number: str, preferred_languag
is_set_guardian = checked_number in set_guardians
is_initiator = checked_number == account.phone_number
guardianship = Guardianship()
is_system_guardian = guardianship.is_system_guardian(checked_number)
if not is_set_guardian:
failure_reason = translation_for('helpers.error.is_not_existent_guardian', preferred_language)
@ -141,7 +148,7 @@ def is_set_pin_guardian(account: Account, checked_number: str, preferred_languag
session_data['failure_reason'] = failure_reason
save_session_data('cic-ussd', session, session_data, ussd_session)
return is_set_guardian and not is_initiator
return (is_set_guardian or is_system_guardian) and not is_initiator
def is_dialers_pin_guardian(state_machine_data: Tuple[str, dict, Account, Session]):
@ -193,8 +200,20 @@ def initiate_pin_reset(state_machine_data: Tuple[str, dict, Account, Session]):
save_session_data('cic-ussd', session, session_data, ussd_session)
guarded_account_phone_number = session_data.get('guarded_account_phone_number')
guarded_account = Account.get_by_phone_number(guarded_account_phone_number, session)
if quorum_count >= guarded_account.guardian_quora:
guarded_account.reset_pin(session)
logg.debug(f'Reset initiated for: {guarded_account.phone_number}')
session_data['quorum_count'] = 0
save_session_data('cic-ussd', session, session_data, ussd_session)
preferred_language = get_cached_preferred_language(guarded_account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
notifier = Notifier()
notifier.send_sms_notification(
key='sms.pin_reset_initiated',
phone_number=guarded_account.phone_number,
preferred_language=preferred_language,
pin_initiator=account.standard_metadata_id())

View File

@ -7,7 +7,7 @@ from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.account.tokens import set_active_token
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_session_data
from cic_ussd.processor.poller import wait_for_session_data
from cic_ussd.session.ussd_session import save_session_data
@ -23,7 +23,7 @@ def is_valid_token_selection(state_machine_data: Tuple[str, dict, Account, Sessi
account_tokens_list = session_data.get('account_tokens_list')
if not account_tokens_list:
wait_for_session_data('Account token list', session_data_key='account_tokens_list', ussd_session=ussd_session)
if user_input not in ['00', '22']:
if user_input not in ['00', '11', '22']:
try:
user_input = int(user_input)
return user_input <= len(account_tokens_list)

View File

@ -15,7 +15,7 @@ from cic_ussd.cache import Cache, cache_data, cache_data_key, get_cached_data
from cic_ussd.account.chain import Chain
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.account import Account
from cic_ussd.processor.util import wait_for_cache
from cic_ussd.processor.poller import wait_for_cache
from cic_ussd.account.statement import filter_statement_transactions
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.account.tokens import (collate_token_metadata,
@ -32,14 +32,14 @@ celery_app = celery.current_app
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def account_creation_callback(self, result: str, url: str, status_code: int):
def account_creation_callback(self, result: str, param: str, status_code: int):
"""This function defines a task that creates a user and
:param self: Reference providing access to the callback task instance.
:type self: celery.Task
:param result: The blockchain address for the created account
:type result: str
:param url: URL provided to callback task in cic-eth should http be used for callback.
:type url: str
:param param: URL provided to callback task in cic-eth should http be used for callback.
:type param: str
:param status_code: The status of the task to create an account
:type status_code: int
"""
@ -69,6 +69,15 @@ def account_creation_callback(self, result: str, url: str, status_code: int):
set_active_token(blockchain_address=result, token_symbol=token_symbol)
queue = self.request.delivery_info.get('routing_key')
preferences_data = {"preferred_language": param}
# temporarily caching selected language
key = cache_data_key(bytes.fromhex(result), MetadataPointer.PREFERENCES)
cache_data(key, json.dumps(preferences_data))
s_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.add_preferences_metadata', [result, preferences_data], queue=queue
)
s_preferences_metadata.apply_async()
s_phone_pointer = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer', [result, phone_number], queue=queue
)

View File

@ -1,9 +1,56 @@
"""
This module is responsible for translation of ussd menu text based on a user's set preferred language.
"""
# standard imports
import json
import i18n
import os
from pathlib import Path
from typing import Optional
# external imports
from cic_translations.processor import generate_translation_files, parse_csv
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data, cache_data_key
from cic_ussd.validator import validate_presence
def generate_locale_files(locale_dir: str, schema_file_path: str, translation_builder_path: str):
""""""
translation_builder_files = os.listdir(translation_builder_path)
for file in translation_builder_files:
props = Path(file)
if props.suffix == '.csv':
parsed_csv = parse_csv(os.path.join(translation_builder_path, file))
generate_translation_files(
parsed_csv=parsed_csv,
schema_file_path=schema_file_path,
translation_file_type=props.stem,
translation_file_path=locale_dir
)
class Languages:
languages_dict: dict = None
@classmethod
def load_languages_dict(cls, languages_file: str):
with open(languages_file, "r") as languages_file:
cls.languages_dict = json.load(languages_file)
def cache_system_languages(self):
system_languages: list = list(self.languages_dict.values())
languages_list = []
for i in range(len(system_languages)):
language = f'{i + 1}. {system_languages[i]}'
languages_list.append(language)
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cache_data(key, json.dumps(languages_list))
def translation_for(key: str, preferred_language: Optional[str] = None, **kwargs) -> str:
"""

View File

@ -11,3 +11,6 @@ transitions=transitions/
host =
port =
ssl =
[system]
guardians_file = cic_ussd/data/sys/guardians.txt

View File

@ -6,3 +6,6 @@ password_pepper=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
[machine]
states=states/
transitions=transitions/
[system]
guardians_file = cic_ussd/data/sys/guardians.txt

View File

@ -1,2 +1,2 @@
[chain]
spec = 'evm:foo:1:bar'
spec = evm:foo:1:bar

View File

@ -1,3 +1,10 @@
[locale]
fallback=sw
path=var/lib/locale/
path=cic_ussd/data/locale/
file_builders=cic_ussd/data/sys/
[schema]
file_path = data/schema
[languages]
file = cic_ussd/data/sys/languages.json

View File

@ -1,5 +1,5 @@
[ussd]
menu_file=cic_ussd/db/ussd_menu.json
menu_file=cic_ussd/data/sys/ussd_menu.json
service_code=*483*46#,*483*061#,*384*96#
user =
pass =

View File

@ -1,3 +1,10 @@
[locale]
fallback=sw
path=var/lib/locale/
path=cic_ussd/data/locale/
file_builders=cic_ussd/data/sys/
[schema]
file_path = /usr/local/lib/python3.8/site-packages/cic_translations/data/schema
[languages]
file = cic_ussd/data/sys/languages.json

View File

@ -1,5 +1,5 @@
[ussd]
menu_file=data/ussd_menu.json
menu_file=cic_ussd/data/sys/ussd_menu.json
service_code=*483*46#,*483*061#,*384*96#
user =
pass =

View File

@ -3,12 +3,10 @@ ARG DOCKER_REGISTRY="registry.gitlab.com/grassrootseconomics"
FROM $DOCKER_REGISTRY/cic-base-images:python-3.8.6-dev-e8eb2ee2
RUN apt-get install -y redis-server
# create secrets directory
RUN mkdir -vp pgp/keys
# create application directory
RUN mkdir -vp cic-ussd
RUN mkdir -vp data
ARG EXTRA_PIP_INDEX_URL=https://pip.grassrootseconomics.net
ARG EXTRA_PIP_ARGS=""
@ -25,7 +23,8 @@ RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
COPY . .
RUN python setup.py install
COPY cic_ussd/db/ussd_menu.json data/
# create local files directory
RUN mkdir -vp cic_ussd/data/locale
COPY docker/*.sh ./
RUN chmod +x /root/*.sh

View File

@ -4,8 +4,9 @@ billiard==3.6.4.0
bcrypt==3.2.0
celery==4.4.7
cffi==1.14.6
cic-eth~=0.12.6
cic-eth~=0.12.7
cic-notify~=0.4.0a12
cic-translations~=0.0.3
cic-types~=0.2.1a8
confini~=0.5.2
cic-eth-aux-erc20-demurrage-token~=0.0.3

View File

@ -13,5 +13,7 @@
"products_edit_pin_authorization",
"account_balances_pin_authorization",
"account_statement_pin_authorization",
"account_balances"
"account_balances",
"middle_language_set",
"last_language_set"
]

View File

@ -2,6 +2,8 @@
"start",
"scan_data",
"initial_language_selection",
"initial_middle_language_set",
"initial_last_language_set",
"initial_pin_entry",
"initial_pin_confirmation",
"change_preferred_language"

View File

@ -1,12 +1,12 @@
cic-eth[services]~=0.12.4a13
Faker==8.1.2
cic-eth[services]~=0.12.7
Faker==11.1.0
faker-e164==0.1.0
pytest==6.2.4
pytest-alembic==0.2.5
pytest==6.2.5
pytest-alembic==0.7.0
pytest-celery==0.0.0a1
pytest-cov==2.10.1
pytest-mock==3.3.1
pytest-cov==3.0.0
pytest-mock==3.6.1
pytest-ordering==0.6
pytest-redis==2.0.0
requests-mock==1.8.0
tavern==1.14.2
pytest-redis==2.3.0
requests-mock==1.9.3
tavern==1.18.0

View File

@ -2,10 +2,16 @@
# external imports
import pytest
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.balance import calculate_available_balance, get_balances, get_cached_available_balance
from cic_ussd.account.balance import (calculate_available_balance,
get_balances,
get_cached_adjusted_balance,
get_cached_available_balance)
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_token_data_list
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError
# test imports
@ -57,19 +63,45 @@ def test_calculate_available_balance(activated_account,
'balance_outgoing': balance_outgoing,
'balance_incoming': balance_incoming
}
assert calculate_available_balance(balances) == available_balance
assert calculate_available_balance(balances, 6) == available_balance
def test_get_cached_available_balance(activated_account,
balances,
cache_balances,
cache_default_token_data,
load_chain_spec):
cached_available_balance = get_cached_available_balance(activated_account.blockchain_address)
available_balance = calculate_available_balance(balances[0])
load_chain_spec,
token_symbol):
identifier = [bytes.fromhex(activated_account.blockchain_address), token_symbol.encode('utf-8')]
cached_available_balance = get_cached_available_balance(6, identifier)
available_balance = calculate_available_balance(balances[0], 6)
assert cached_available_balance == available_balance
address = blockchain_address()
with pytest.raises(CachedDataNotFoundError) as error:
cached_available_balance = get_cached_available_balance(address)
identifier = [bytes.fromhex(address), token_symbol.encode('utf-8')]
key = cache_data_key(identifier=identifier, salt=MetadataPointer.BALANCES)
cached_available_balance = get_cached_available_balance(6, identifier)
assert cached_available_balance is None
assert str(error.value) == f'No cached available balance for address: {address}'
assert str(error.value) == f'No cached available balance at {key}'
def test_get_cached_adjusted_balance(activated_account, cache_adjusted_balances, token_symbol):
identifier = bytes.fromhex(activated_account.blockchain_address)
balances_identifier = [identifier, token_symbol.encode('utf-8')]
key = cache_data_key(balances_identifier, MetadataPointer.BALANCES_ADJUSTED)
adjusted_balances = get_cached_data(key)
assert get_cached_adjusted_balance(balances_identifier) == adjusted_balances
def test_get_account_tokens_balance(activated_account,
cache_token_data_list,
celery_session_worker,
load_chain_spec,
load_config,
mock_async_balance_api_query,
token_symbol):
blockchain_address = activated_account.blockchain_address
chain_str = Chain.spec.__str__()
get_balances(blockchain_address, chain_str, token_symbol, asynchronous=True)
assert mock_async_balance_api_query.get('address') == blockchain_address
assert mock_async_balance_api_query.get('token_symbol') == token_symbol

View File

@ -0,0 +1,21 @@
# standard imports
import os
# external imports
# local imports
from cic_ussd.account.guardianship import Guardianship
# test imports
from tests.fixtures.config import root_directory
def test_guardianship(load_config, setup_guardianship):
guardians_file = os.path.join(root_directory, load_config.get('SYSTEM_GUARDIANS_FILE'))
with open(guardians_file, 'r') as system_guardians:
guardians = [line.strip() for line in system_guardians]
assert Guardianship.guardians == guardians
guardianship = Guardianship()
assert guardianship.is_system_guardian(Guardianship.guardians[0]) is True
assert guardianship.is_system_guardian('+254712345678') is False

View File

@ -11,8 +11,7 @@ from cic_ussd.account.statement import (filter_statement_transactions,
generate,
get_cached_statement,
parse_statement_transactions,
query_statement,
statement_transaction_set)
query_statement)
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.cache import cache_data_key, get_cached_data
@ -74,12 +73,3 @@ def test_query_statement(blockchain_address, limit, load_chain_spec, activated_a
query_statement(blockchain_address, limit)
assert mock_transaction_list_query.get('address') == blockchain_address
assert mock_transaction_list_query.get('limit') == limit
def test_statement_transaction_set(cache_default_token_data, load_chain_spec, preferences, set_locale_files, statement):
parsed_transactions = parse_statement_transactions(statement)
preferred_language = preferences.get('preferred_language')
transaction_set = statement_transaction_set(preferred_language, parsed_transactions)
transaction_set.startswith('Sent')
transaction_set = statement_transaction_set(preferred_language, [])
transaction_set.startswith('No')

View File

@ -1,17 +1,80 @@
# standard imports
import hashlib
import json
# external imports
import pytest
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_default_token, get_default_token_symbol, query_default_token
from cic_ussd.account.tokens import (collate_token_metadata,
create_account_tokens_list,
get_active_token_symbol,
get_default_token_symbol,
get_cached_default_token,
get_cached_token_data,
get_cached_token_data_list,
get_cached_token_symbol_list,
hashed_token_proof,
handle_token_symbol_list,
order_account_tokens_list,
parse_token_list,
process_token_data,
query_default_token,
query_token_data,
remove_from_account_tokens_list,
set_active_token)
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError
# test imports
def test_collate_token_metadata(token_meta_symbol, token_proof_symbol):
description = token_proof_symbol.get('description')
issuer = token_proof_symbol.get('issuer')
location = token_meta_symbol.get('location')
contact = token_meta_symbol.get('contact')
data = {
'description': description,
'issuer': issuer,
'location': location,
'contact': contact
}
assert collate_token_metadata(token_proof_symbol, token_meta_symbol) == data
def test_create_account_tokens_list(activated_account,
cache_balances,
cache_token_data,
cache_token_symbol_list,
init_cache):
create_account_tokens_list(activated_account.blockchain_address)
key = cache_data_key(bytes.fromhex(activated_account.blockchain_address), MetadataPointer.TOKEN_DATA_LIST)
cached_data_list = json.loads(get_cached_data(key))
data = get_cached_token_data_list(activated_account.blockchain_address)
assert cached_data_list == data
def test_get_active_token_symbol(activated_account, set_active_token, valid_recipient):
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_ACTIVE)
active_token_symbol = get_cached_data(key)
assert active_token_symbol == get_active_token_symbol(activated_account.blockchain_address)
with pytest.raises(CachedDataNotFoundError) as error:
get_active_token_symbol(valid_recipient.blockchain_address)
assert str(error.value) == 'No active token set.'
def test_get_cached_token_data(activated_account, cache_token_data, token_symbol):
identifier = [bytes.fromhex(activated_account.blockchain_address), token_symbol.encode('utf-8')]
key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA)
token_data = json.loads(get_cached_data(key))
assert token_data == get_cached_token_data(activated_account.blockchain_address, token_symbol)
def test_get_cached_default_token(cache_default_token_data, default_token_data, load_chain_spec):
chain_str = Chain.spec.__str__()
cached_default_token = get_cached_default_token(chain_str)
@ -27,6 +90,84 @@ def test_get_default_token_symbol_from_api(default_token_data, load_chain_spec,
assert default_token_symbol == default_token_data['symbol']
def test_get_cached_token_data_list(activated_account, cache_token_data_list):
blockchain_address = activated_account.blockchain_address
key = cache_data_key(identifier=bytes.fromhex(blockchain_address), salt=MetadataPointer.TOKEN_DATA_LIST)
token_symbols_list = json.loads(get_cached_data(key))
assert token_symbols_list == get_cached_token_data_list(blockchain_address)
def test_get_cached_token_symbol_list(activated_account, cache_token_symbol_list):
blockchain_address = activated_account.blockchain_address
key = cache_data_key(identifier=bytes.fromhex(blockchain_address), salt=MetadataPointer.TOKEN_SYMBOLS_LIST)
token_symbols_list = json.loads(get_cached_data(key))
assert token_symbols_list == get_cached_token_symbol_list(blockchain_address)
def test_hashed_token_proof(token_proof_symbol):
hash_object = hashlib.new("sha256")
token_proof = json.dumps(token_proof_symbol)
hash_object.update(token_proof.encode('utf-8'))
assert hash_object.digest().hex() == hashed_token_proof(token_proof_symbol)
def test_handle_token_symbol_list(activated_account, init_cache):
handle_token_symbol_list(activated_account.blockchain_address, 'GFT')
cached_token_symbol_list = get_cached_token_symbol_list(activated_account.blockchain_address)
assert len(cached_token_symbol_list) == 1
handle_token_symbol_list(activated_account.blockchain_address, 'DET')
cached_token_symbol_list = get_cached_token_symbol_list(activated_account.blockchain_address)
assert len(cached_token_symbol_list) == 2
def test_order_account_tokens_list(activated_account, token_list_entries):
identifier = bytes.fromhex(activated_account.blockchain_address)
last_sent_token_key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_LAST_SENT)
cache_data(last_sent_token_key, 'FII')
last_received_token_key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_LAST_RECEIVED)
cache_data(last_received_token_key, 'DET')
ordered_list = order_account_tokens_list(token_list_entries, identifier)
assert ordered_list == [
{
'name': 'Fee',
'symbol': 'FII',
'issuer': 'Foo',
'contact': {
'phone': '+254712345678'
},
'location': 'Fum',
'balance': 50.0
},
{
'name': 'Demurrage Token',
'symbol': 'DET',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'},
'location': 'Fum',
'balance': 49.99
},
{
'name': 'Giftable Token',
'symbol': 'GFT',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'},
'location': 'Fum',
'balance': 60.0
}
]
def test_parse_token_list(token_list_entries):
parsed_token_list = ['1. FII 50.0', '2. GFT 60.0', '3. DET 49.99']
assert parsed_token_list == parse_token_list(token_list_entries)
def test_query_default_token(default_token_data, load_chain_spec, mock_sync_default_token_api_query):
chain_str = Chain.spec.__str__()
queried_default_token_data = query_default_token(chain_str)
@ -40,3 +181,38 @@ def test_get_default_token_symbol_from_cache(cache_default_token_data, default_t
default_token_symbol = get_default_token_symbol()
assert default_token_symbol is not None
assert default_token_symbol == default_token_data.get('symbol')
def test_remove_from_account_tokens_list(token_list_entries):
assert remove_from_account_tokens_list(token_list_entries, 'GFT') == ([{
'name': 'Giftable Token',
'symbol': 'GFT',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'
},
'location': 'Fum',
'balance': 60.0
}],
[
{
'name': 'Fee',
'symbol': 'FII',
'issuer': 'Foo',
'contact': {'phone': '+254712345678'},
'location': 'Fum',
'balance': 50.0
},
{
'name': 'Demurrage Token',
'symbol': 'DET',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'
},
'location': 'Fum',
'balance': 49.99
}
])

View File

@ -1,5 +1,4 @@
# standard imports
from decimal import Decimal
# external imports
import pytest
@ -37,11 +36,11 @@ def test_aux_transaction_data(preferences, set_locale_files, transactions_list):
@pytest.mark.parametrize("value, expected_result", [
(50000000, Decimal('50.00')),
(100000, Decimal('0.10'))
(50000000, 50.0),
(100000, 0.1)
])
def test_from_wei(cache_default_token_data, expected_result, value):
assert from_wei(value) == expected_result
assert from_wei(6, value) == expected_result
@pytest.mark.parametrize("value, expected_result", [
@ -49,7 +48,7 @@ def test_from_wei(cache_default_token_data, expected_result, value):
(0.10, 100000)
])
def test_to_wei(cache_default_token_data, expected_result, value):
assert to_wei(value) == expected_result
assert to_wei(6, value) == expected_result
@pytest.mark.parametrize("decimals, value, expected_result", [
@ -108,8 +107,8 @@ def test_outgoing_transaction_processor(activated_account,
activated_account.blockchain_address,
valid_recipient.blockchain_address)
outgoing_tx_processor.transfer(amount, token_symbol)
outgoing_tx_processor.transfer(amount, 6, token_symbol)
assert mock_transfer_api.get('from_address') == activated_account.blockchain_address
assert mock_transfer_api.get('to_address') == valid_recipient.blockchain_address
assert mock_transfer_api.get('value') == to_wei(amount)
assert mock_transfer_api.get('value') == to_wei(6, amount)
assert mock_transfer_api.get('token_symbol') == token_symbol

View File

@ -90,7 +90,7 @@ def test_standard_metadata_id(activated_account, cache_person_metadata, pending_
def test_account_create(init_cache, init_database, load_chain_spec, mock_account_creation_task_result, task_uuid):
chain_str = Chain.spec.__str__()
create(chain_str, phone_number(), init_database)
create(chain_str, phone_number(), init_database, 'en')
assert len(init_database.query(TaskTracker).all()) == 1
account_creation_data = get_cached_data(task_uuid)
assert json.loads(account_creation_data).get('status') == AccountStatus.PENDING.name

View File

@ -23,7 +23,7 @@ def test_ussd_metadata_handler(activated_account,
setup_metadata_signer):
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
cic_type = MetadataPointer.PERSON
metadata_client = UssdMetadataHandler(cic_type, identifier)
metadata_client = UssdMetadataHandler(cic_type=cic_type, identifier=identifier)
assert metadata_client.cic_type == cic_type
assert metadata_client.engine == 'pgp'
assert metadata_client.identifier == identifier

View File

@ -0,0 +1,72 @@
# standard imports
import json
# external imports
import pytest
import requests_mock
from cic_types.condiments import MetadataPointer
from requests.exceptions import HTTPError
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.metadata import TokenMetadata
from cic_ussd.metadata.tokens import token_metadata_handler, query_token_metadata, query_token_info
# test imports
def test_token_metadata_handler(activated_account,
init_cache,
setup_metadata_request_handler,
setup_metadata_signer,
token_meta_symbol,
token_symbol):
with requests_mock.Mocker(real_http=False) as request_mocker:
with pytest.raises(HTTPError) as error:
metadata_client = TokenMetadata(identifier=b'foo', cic_type=MetadataPointer.TOKEN_META_SYMBOL)
reason = 'Not Found'
status_code = 401
request_mocker.register_uri('GET', metadata_client.url, status_code=status_code, reason=reason)
token_metadata_handler(metadata_client)
assert str(error.value) == f'Client Error: {status_code}, reason: {reason}'
identifier = token_symbol.encode('utf-8')
metadata_client = TokenMetadata(identifier, cic_type=MetadataPointer.TOKEN_META_SYMBOL)
request_mocker.register_uri('GET', metadata_client.url, json=token_meta_symbol, status_code=200, reason='OK')
token_metadata_handler(metadata_client)
key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
cached_token_meta_symbol = get_cached_data(key)
assert json.loads(cached_token_meta_symbol) == token_meta_symbol
def test_query_token_metadata(init_cache,
setup_metadata_request_handler,
setup_metadata_signer,
token_meta_symbol,
token_proof_symbol,
token_symbol):
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = token_symbol.encode('utf-8')
metadata_client = TokenMetadata(identifier, cic_type=MetadataPointer.TOKEN_META_SYMBOL)
request_mocker.register_uri('GET', metadata_client.url, json=token_meta_symbol, status_code=200, reason='OK')
query_token_metadata(identifier)
key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
cached_token_meta_symbol = get_cached_data(key)
assert json.loads(cached_token_meta_symbol) == token_meta_symbol
def test_query_token_info(init_cache,
setup_metadata_request_handler,
setup_metadata_signer,
token_meta_symbol,
token_proof_symbol,
token_symbol):
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = token_symbol.encode('utf-8')
metadata_client = TokenMetadata(identifier, cic_type=MetadataPointer.TOKEN_PROOF_SYMBOL)
request_mocker.register_uri('GET', metadata_client.url, json=token_proof_symbol, status_code=200, reason='OK')
query_token_info(identifier)
key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
cached_token_proof_symbol = get_cached_data(key)
assert json.loads(cached_token_proof_symbol) == token_proof_symbol

View File

@ -1,6 +1,6 @@
# standard imports
import json
import datetime
import os
# external imports
from cic_types.condiments import MetadataPointer
@ -10,195 +10,464 @@ from cic_ussd.account.balance import get_cached_available_balance
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import (
get_cached_statement,
parse_statement_transactions,
statement_transaction_set
parse_statement_transactions
)
from cic_ussd.account.tokens import get_default_token_symbol
from cic_ussd.account.tokens import (get_active_token_symbol,
get_cached_token_data)
from cic_ussd.account.transaction import from_wei, to_wei
from cic_ussd.cache import cache_data, cache_data_key
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.metadata import PersonMetadata
from cic_ussd.phone_number import Support
from cic_ussd.processor.menu import response
from cic_ussd.processor.util import parse_person_metadata
from cic_ussd.processor.menu import response, MenuProcessor
from cic_ussd.processor.util import parse_person_metadata, ussd_menu_list
from cic_ussd.translation import translation_for
# test imports
def test_menu_processor(activated_account,
balances,
cache_balances,
cache_default_token_data,
cache_preferences,
cache_person_metadata,
cache_statement,
celery_session_worker,
generic_ussd_session,
init_database,
load_chain_spec,
load_support_phone,
load_ussd_menu,
mock_get_adjusted_balance,
mock_sync_balance_api_query,
mock_transaction_list_query,
valid_recipient):
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
available_balance = get_cached_available_balance(activated_account.blockchain_address)
token_symbol = get_default_token_symbol()
with_available_balance = 'ussd.kenya.account_balances.available_balance'
with_fees = 'ussd.kenya.account_balances.with_fees'
ussd_menu = UssdMenu.find_by_name('account_balances')
name = ussd_menu.get('name')
resp = response(activated_account, 'ussd.kenya.account_balances', name, init_database, generic_ussd_session)
def test_account_balance(activated_account, cache_balances, cache_preferences, cache_token_data,
generic_ussd_session, init_database, set_active_token):
"""blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
decimals = token_data.get("decimals")
identifier = bytes.fromhex(blockchain_address)
balances_identifier = [identifier, token_symbol.encode('utf-8')]
available_balance = get_cached_available_balance(decimals, balances_identifier)
with_available_balance = 'ussd.account_balances.available_balance'
resp = response(activated_account, with_available_balance, with_available_balance[5:], init_database,
generic_ussd_session)
assert resp == translation_for(with_available_balance,
preferred_language,
available_balance=available_balance,
token_symbol=token_symbol)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.BALANCES_ADJUSTED)
with_fees = 'ussd.account_balances.with_fees'
key = cache_data_key(balances_identifier, MetadataPointer.BALANCES_ADJUSTED)
adjusted_balance = 45931650.64654012
cache_data(key, json.dumps(adjusted_balance))
resp = response(activated_account, 'ussd.kenya.account_balances', name, init_database, generic_ussd_session)
tax_wei = to_wei(int(available_balance)) - int(adjusted_balance)
tax = from_wei(int(tax_wei))
resp = response(activated_account, with_fees, with_fees[5:], init_database, generic_ussd_session)
tax_wei = to_wei(decimals, int(available_balance)) - int(adjusted_balance)
tax = from_wei(decimals, int(tax_wei))
assert resp == translation_for(key=with_fees,
preferred_language=preferred_language,
available_balance=available_balance,
tax=tax,
token_symbol=token_symbol)
token_symbol=token_symbol)"""
pass
cached_statement = get_cached_statement(activated_account.blockchain_address)
statement = json.loads(cached_statement)
statement_transactions = parse_statement_transactions(statement)
transaction_sets = [statement_transactions[tx:tx + 3] for tx in range(0, len(statement_transactions), 3)]
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
if transaction_sets:
first_transaction_set = statement_transaction_set(preferred_language, transaction_sets[0])
if len(transaction_sets) >= 2:
middle_transaction_set = statement_transaction_set(preferred_language, transaction_sets[1])
if len(transaction_sets) >= 3:
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
display_key = 'ussd.kenya.first_transaction_set'
ussd_menu = UssdMenu.find_by_name('first_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
def test_account_statement(activated_account,
cache_preferences,
cache_statement,
generic_ussd_session,
init_database,
set_active_token,
set_locale_files):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
cached_statement = get_cached_statement(blockchain_address)
statement_list = parse_statement_transactions(statement=json.loads(cached_statement))
first_transaction_set = 'ussd.first_transaction_set'
middle_transaction_set = 'ussd.middle_transaction_set'
last_transaction_set = 'ussd.last_transaction_set'
fallback = translation_for('helpers.no_transaction_history', preferred_language)
transaction_sets = ussd_menu_list(fallback=fallback, menu_list=statement_list, split=3)
resp = response(activated_account, first_transaction_set, first_transaction_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(first_transaction_set, preferred_language, first_transaction_set=transaction_sets[0])
resp = response(activated_account, middle_transaction_set, middle_transaction_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(middle_transaction_set, preferred_language,
middle_transaction_set=transaction_sets[1])
resp = response(activated_account, last_transaction_set, last_transaction_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(last_transaction_set, preferred_language, last_transaction_set=transaction_sets[2])
assert resp == translation_for(display_key, preferred_language, first_transaction_set=first_transaction_set)
display_key = 'ussd.kenya.middle_transaction_set'
ussd_menu = UssdMenu.find_by_name('middle_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
def test_add_guardian_pin_authorization(activated_account,
cache_preferences,
guardian_account,
generic_ussd_session,
init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
add_guardian_pin_authorization = 'ussd.add_guardian_pin_authorization'
activated_account.add_guardian(guardian_account.phone_number)
init_database.flush()
generic_ussd_session['external_session_id'] = os.urandom(20).hex()
generic_ussd_session['msisdn'] = guardian_account.phone_number
generic_ussd_session['data'] = {'guardian_phone_number': guardian_account.phone_number}
generic_ussd_session['state'] = 'add_guardian_pin_authorization'
resp = response(activated_account,
add_guardian_pin_authorization,
add_guardian_pin_authorization[5:],
init_database,
generic_ussd_session)
assert resp == translation_for(f'{add_guardian_pin_authorization}.first', preferred_language,
guardian_information=guardian_account.standard_metadata_id())
assert resp == translation_for(display_key, preferred_language, middle_transaction_set=middle_transaction_set)
display_key = 'ussd.kenya.last_transaction_set'
ussd_menu = UssdMenu.find_by_name('last_transaction_set')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
def test_guardian_list(activated_account,
cache_preferences,
generic_ussd_session,
guardian_account,
init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
guardians_list = 'ussd.guardian_list'
guardians_list_header = translation_for('helpers.guardians_list_header', preferred_language)
guardian_information = guardian_account.standard_metadata_id()
guardians = guardians_list_header + '\n' + f'{guardian_information}\n'
activated_account.add_guardian(guardian_account.phone_number)
init_database.flush()
resp = response(activated_account, guardians_list, guardians_list[5:], init_database, generic_ussd_session)
assert resp == translation_for(guardians_list, preferred_language, guardians_list=guardians)
guardians = translation_for('helpers.no_guardians_list', preferred_language)
identifier = bytes.fromhex(guardian_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.PREFERENCES)
cache_data(key, json.dumps({'preferred_language': preferred_language}))
resp = response(guardian_account, guardians_list, guardians_list[5:], init_database, generic_ussd_session)
assert resp == translation_for(guardians_list, preferred_language, guardians_list=guardians)
assert resp == translation_for(display_key, preferred_language, last_transaction_set=last_transaction_set)
display_key = 'ussd.kenya.display_user_metadata'
ussd_menu = UssdMenu.find_by_name('display_user_metadata')
name = ussd_menu.get('name')
identifier = bytes.fromhex(activated_account.blockchain_address)
def test_account_tokens(activated_account, cache_token_data_list, celery_session_worker, generic_ussd_session,
init_cache, init_database):
"""blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
cached_token_data_list = get_cached_token_data_list(blockchain_address)
token_data_list = ['1. GFT 50.0']
fallback = translation_for('helpers.no_tokens_list', preferred_language)
token_list_sets = ussd_menu_list(fallback=fallback, menu_list=token_data_list, split=3)
first_account_tokens_set = 'ussd.first_account_tokens_set'
middle_account_tokens_set = 'ussd.middle_account_tokens_set'
last_account_tokens_set = 'ussd.last_account_tokens_set'
resp = response(activated_account, first_account_tokens_set, first_account_tokens_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(first_account_tokens_set, preferred_language,
first_account_tokens_set=token_list_sets[0])
assert generic_ussd_session.get('data').get('account_tokens_list') == cached_token_data_list
resp = response(activated_account, middle_account_tokens_set, middle_account_tokens_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(middle_account_tokens_set, preferred_language,
middle_account_tokens_set=token_list_sets[1])
resp = response(activated_account, last_account_tokens_set, last_account_tokens_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(last_account_tokens_set, preferred_language,
last_account_tokens_set=token_list_sets[2])"""
pass
def test_help(activated_account, cache_preferences, generic_ussd_session, init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
help = 'ussd.help'
resp = response(activated_account, help, help[5:], init_database, generic_ussd_session)
assert resp == translation_for(help, preferred_language, support_phone=Support.phone_number)
def test_person_data(activated_account, cache_person_metadata, cache_preferences, cached_ussd_session,
generic_ussd_session, init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
identifier = bytes.fromhex(blockchain_address)
display_user_metadata = 'ussd.display_user_metadata'
person_metadata = PersonMetadata(identifier)
cached_person_metadata = person_metadata.get_cached_metadata()
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == parse_person_metadata(cached_person_metadata, display_key, preferred_language)
resp = response(activated_account, display_user_metadata, display_user_metadata[5:], init_database,
generic_ussd_session)
assert resp == parse_person_metadata(cached_person_metadata, display_user_metadata, preferred_language)
display_key = 'ussd.kenya.account_balances_pin_authorization'
ussd_menu = UssdMenu.find_by_name('account_balances_pin_authorization')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(f'{display_key}.first', preferred_language)
activated_account.failed_pin_attempts = 1
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
retry_pin_entry = translation_for('ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=2)
assert resp == translation_for(f'{display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry)
activated_account.failed_pin_attempts = 0
def test_guarded_account_metadata(activated_account, generic_ussd_session, init_database):
reset_guarded_pin_authorization = 'ussd.reset_guarded_pin_authorization'
generic_ussd_session['data'] = {'guarded_account_phone_number': activated_account.phone_number}
menu_processor = MenuProcessor(activated_account, reset_guarded_pin_authorization,
reset_guarded_pin_authorization[5:], init_database, generic_ussd_session)
assert menu_processor.guarded_account_metadata() == activated_account.standard_metadata_id()
display_key = 'ussd.kenya.start'
ussd_menu = UssdMenu.find_by_name('start')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
def test_guardian_metadata(activated_account, generic_ussd_session, guardian_account, init_database):
add_guardian_pin_authorization = 'ussd.add_guardian_pin_authorization'
generic_ussd_session['data'] = {'guardian_phone_number': guardian_account.phone_number}
menu_processor = MenuProcessor(activated_account, add_guardian_pin_authorization,
add_guardian_pin_authorization[5:], init_database, generic_ussd_session)
assert menu_processor.guardian_metadata() == guardian_account.standard_metadata_id()
def test_language(activated_account, cache_preferences, generic_ussd_session, init_database, load_languages):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
initial_language_selection = 'ussd.initial_language_selection'
select_preferred_language = 'ussd.select_preferred_language'
initial_middle_language_set = 'ussd.initial_middle_language_set'
middle_language_set = 'ussd.middle_language_set'
initial_last_language_set = 'ussd.initial_last_language_set'
last_language_set = 'ussd.last_language_set'
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list: list = json.loads(cached_system_languages)
fallback = translation_for('helpers.no_language_list', preferred_language)
language_list_sets = ussd_menu_list(fallback=fallback, menu_list=language_list, split=3)
resp = response(activated_account, initial_language_selection, initial_language_selection[5:], init_database,
generic_ussd_session)
assert resp == translation_for(initial_language_selection, preferred_language,
first_language_set=language_list_sets[0])
resp = response(activated_account, select_preferred_language, select_preferred_language[5:], init_database,
generic_ussd_session)
assert resp == translation_for(select_preferred_language, preferred_language,
first_language_set=language_list_sets[0])
resp = response(activated_account, initial_middle_language_set, initial_middle_language_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(initial_middle_language_set, preferred_language,
middle_language_set=language_list_sets[1])
resp = response(activated_account, initial_last_language_set, initial_last_language_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(initial_last_language_set, preferred_language,
last_language_set=language_list_sets[2])
resp = response(activated_account, middle_language_set, middle_language_set[5:], init_database,
generic_ussd_session)
assert resp == translation_for(middle_language_set, preferred_language, middle_language_set=language_list_sets[1])
resp = response(activated_account, last_language_set, last_language_set[5:], init_database, generic_ussd_session)
assert resp == translation_for(last_language_set, preferred_language, last_language_set=language_list_sets[2])
def test_account_creation_prompt(activated_account, cache_preferences, generic_ussd_session, init_database,
load_languages):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
user_input = ''
if preferred_language == 'en':
user_input = '1'
elif preferred_language == 'sw':
user_input = '2'
account_creation_prompt = 'ussd.account_creation_prompt'
generic_ussd_session['user_input'] = user_input
resp = response(activated_account, account_creation_prompt, account_creation_prompt[5:], init_database,
generic_ussd_session)
assert resp == translation_for(account_creation_prompt, preferred_language)
def test_reset_guarded_pin_authorization(activated_account, cache_preferences, generic_ussd_session, guardian_account,
init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
reset_guarded_pin_authorization = 'ussd.reset_guarded_pin_authorization'
generic_ussd_session['external_session_id'] = os.urandom(20).hex()
generic_ussd_session['msisdn'] = guardian_account.phone_number
generic_ussd_session['data'] = {'guarded_account_phone_number': activated_account.phone_number}
resp = response(activated_account,
reset_guarded_pin_authorization,
reset_guarded_pin_authorization[5:],
init_database,
generic_ussd_session)
assert resp == translation_for(f'{reset_guarded_pin_authorization}.first', preferred_language,
guarded_account_information=activated_account.phone_number)
def test_start(activated_account, cache_balances, cache_preferences, cache_token_data, cache_token_data_list,
cache_token_symbol_list, celery_session_worker, generic_ussd_session, init_database, load_chain_spec,
mock_sync_balance_api_query, set_active_token):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
decimals = token_data.get("decimals")
identifier = bytes.fromhex(blockchain_address)
balances_identifier = [identifier, token_symbol.encode('utf-8')]
available_balance = get_cached_available_balance(decimals, balances_identifier)
start = 'ussd.start'
resp = response(activated_account, start, start[5:], init_database, generic_ussd_session)
assert resp == translation_for(start,
preferred_language,
account_balance=available_balance,
account_token_name=token_symbol)
display_key = 'ussd.kenya.start'
ussd_menu = UssdMenu.find_by_name('start')
name = ussd_menu.get('name')
older_timestamp = (activated_account.created - datetime.timedelta(days=35))
activated_account.created = older_timestamp
init_database.flush()
response(activated_account, display_key, name, init_database, generic_ussd_session)
assert mock_get_adjusted_balance['timestamp'] == int((datetime.datetime.now() - datetime.timedelta(days=30)).timestamp())
display_key = 'ussd.kenya.transaction_pin_authorization'
ussd_menu = UssdMenu.find_by_name('transaction_pin_authorization')
name = ussd_menu.get('name')
def test_token_selection_pin_authorization(activated_account, cache_preferences, cache_token_data, generic_ussd_session,
init_database, set_active_token):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
token_selection_pin_authorization = 'ussd.token_selection_pin_authorization'
generic_ussd_session['data'] = {'selected_token': token_data}
resp = response(activated_account,
token_selection_pin_authorization,
token_selection_pin_authorization[5:],
init_database,
generic_ussd_session)
token_name = token_data.get('name')
token_symbol = token_data.get('symbol')
token_issuer = token_data.get('issuer')
token_contact = token_data.get('contact')
token_location = token_data.get('location')
data = f'{token_name} ({token_symbol})\n{token_issuer}\n{token_contact}\n{token_location}\n'
assert resp == translation_for(f'{token_selection_pin_authorization}.first', preferred_language,
token_data=data)
def test_transaction_pin_authorization(activated_account, cache_preferences, cache_token_data, generic_ussd_session,
init_database, set_active_token, valid_recipient):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
decimals = token_data.get("decimals")
transaction_pin_authorization = 'ussd.transaction_pin_authorization'
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '15'
}
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
resp = response(activated_account, transaction_pin_authorization, transaction_pin_authorization[5:], init_database,
generic_ussd_session)
user_input = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input))
transaction_amount = to_wei(decimals, int(user_input))
tx_recipient_information = valid_recipient.standard_metadata_id()
tx_sender_information = activated_account.standard_metadata_id()
assert resp == translation_for(f'{display_key}.first',
assert resp == translation_for(f'{transaction_pin_authorization}.first',
preferred_language,
recipient_information=tx_recipient_information,
transaction_amount=from_wei(transaction_amount),
transaction_amount=from_wei(decimals, transaction_amount),
token_symbol=token_symbol,
sender_information=tx_sender_information)
display_key = 'ussd.kenya.exit_insufficient_balance'
ussd_menu = UssdMenu.find_by_name('exit_insufficient_balance')
name = ussd_menu.get('name')
def test_guardian_exits(activated_account, cache_preferences, cache_token_data, generic_ussd_session, guardian_account,
init_database, set_active_token):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
generic_ussd_session['data'] = {'guardian_phone_number': guardian_account.phone_number}
# testing exit guardian addition success
exit_guardian_addition_success = 'ussd.exit_guardian_addition_success'
resp = response(activated_account, exit_guardian_addition_success, exit_guardian_addition_success[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_guardian_addition_success, preferred_language,
guardian_information=guardian_account.standard_metadata_id())
# testing exit guardian removal success
exit_guardian_removal_success = 'ussd.exit_guardian_removal_success'
resp = response(activated_account, exit_guardian_removal_success, exit_guardian_removal_success[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_guardian_removal_success, preferred_language,
guardian_information=guardian_account.standard_metadata_id())
generic_ussd_session['data'] = {'failure_reason': 'foo'}
# testing exit invalid guardian addition
exit_invalid_guardian_addition = 'ussd.exit_invalid_guardian_addition'
resp = response(activated_account, exit_invalid_guardian_addition, exit_invalid_guardian_addition[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_invalid_guardian_addition, preferred_language, error_exit='foo')
# testing exit invalid guardian removal
exit_invalid_guardian_removal = 'ussd.exit_invalid_guardian_removal'
resp = response(activated_account, exit_invalid_guardian_removal, exit_invalid_guardian_removal[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_invalid_guardian_removal, preferred_language, error_exit='foo')
def test_exit_pin_reset_initiated_success(activated_account, cache_preferences, generic_ussd_session, init_database):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
exit_pin_reset_initiated_success = 'ussd.exit_pin_reset_initiated_success'
generic_ussd_session['data'] = {'guarded_account_phone_number': activated_account.phone_number}
resp = response(activated_account, exit_pin_reset_initiated_success, exit_pin_reset_initiated_success[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_pin_reset_initiated_success,
preferred_language,
guarded_account_information=activated_account.standard_metadata_id())
def test_exit_insufficient_balance(activated_account, cache_balances, cache_preferences, cache_token_data,
generic_ussd_session, init_database, set_active_token, valid_recipient):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
decimals = token_data.get("decimals")
identifier = bytes.fromhex(blockchain_address)
balances_identifier = [identifier, token_symbol.encode('utf-8')]
available_balance = get_cached_available_balance(decimals, balances_identifier)
tx_recipient_information = valid_recipient.standard_metadata_id()
exit_insufficient_balance = 'ussd.exit_insufficient_balance'
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '85'
}
transaction_amount = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(transaction_amount))
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
transaction_amount = to_wei(decimals, int(transaction_amount))
resp = response(activated_account, exit_insufficient_balance, exit_insufficient_balance[5:], init_database,
generic_ussd_session)
assert resp == translation_for(exit_insufficient_balance,
preferred_language,
amount=from_wei(transaction_amount),
amount=from_wei(decimals, transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=available_balance)
display_key = 'ussd.kenya.exit_invalid_menu_option'
ussd_menu = UssdMenu.find_by_name('exit_invalid_menu_option')
name = ussd_menu.get('name')
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key, preferred_language, support_phone=Support.phone_number)
display_key = 'ussd.kenya.exit_successful_transaction'
ussd_menu = UssdMenu.find_by_name('exit_successful_transaction')
name = ussd_menu.get('name')
def test_exit_invalid_menu_option(activated_account, cache_preferences, generic_ussd_session, init_database,
load_support_phone):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
exit_invalid_menu_option = 'ussd.exit_invalid_menu_option'
resp = response(activated_account, exit_invalid_menu_option, exit_invalid_menu_option[5:], init_database,
generic_ussd_session)
assert resp == translation_for(exit_invalid_menu_option, preferred_language, support_phone=Support.phone_number)
def test_exit_pin_blocked(activated_account, cache_preferences, generic_ussd_session, init_database,
load_support_phone):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
exit_pin_blocked = 'ussd.exit_pin_blocked'
resp = response(activated_account, exit_pin_blocked, exit_pin_blocked[5:], init_database, generic_ussd_session)
assert resp == translation_for(exit_pin_blocked, preferred_language, support_phone=Support.phone_number)
def test_exit_successful_token_selection(activated_account, cache_preferences, cache_token_data, generic_ussd_session,
init_database, set_active_token):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
exit_successful_token_selection = 'ussd.exit_successful_token_selection'
generic_ussd_session['data'] = {'selected_token': token_data}
resp = response(activated_account, exit_successful_token_selection, exit_successful_token_selection[5:],
init_database, generic_ussd_session)
assert resp == translation_for(exit_successful_token_selection, preferred_language, token_symbol=token_symbol)
def test_exit_successful_transaction(activated_account, cache_preferences, cache_token_data, generic_ussd_session,
init_database, set_active_token, valid_recipient):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
preferred_language = get_cached_preferred_language(blockchain_address)
decimals = token_data.get("decimals")
tx_recipient_information = valid_recipient.standard_metadata_id()
tx_sender_information = activated_account.standard_metadata_id()
exit_successful_transaction = 'ussd.exit_successful_transaction'
generic_ussd_session['data'] = {
'recipient_phone_number': valid_recipient.phone_number,
'transaction_amount': '15'
}
transaction_amount = generic_ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(transaction_amount))
resp = response(activated_account, display_key, name, init_database, generic_ussd_session)
assert resp == translation_for(display_key,
transaction_amount = to_wei(decimals, int(transaction_amount))
resp = response(activated_account, exit_successful_transaction, exit_successful_transaction[5:], init_database,
generic_ussd_session)
assert resp == translation_for(exit_successful_transaction,
preferred_language,
transaction_amount=from_wei(transaction_amount),
transaction_amount=from_wei(decimals, transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
sender_information=tx_sender_information)

View File

@ -0,0 +1,69 @@
# standard imports
import logging
import time
from queue import Queue
# external imports
import pytest
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.error import MaxRetryReached
from cic_ussd.processor.poller import poller, wait_for_cache, wait_for_session_data
# test imports
def test_poller(activated_account, caplog, init_cache, token_symbol):
caplog.set_level(logging.DEBUG)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
with pytest.raises(MaxRetryReached) as error:
interval = 1
max_retry = 3
collected_values = [None, None, None]
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
assert str(error.value) == str(MaxRetryReached(collected_values, None))
cache_data(key, token_symbol)
poller(args=(key,), interval=interval, kwargs=None, max_retry=max_retry, target=get_cached_data)
assert f'Resource: {token_symbol} now available.' in caplog.text
def test_wait_for_cache(activated_account, caplog, init_cache, token_symbol):
caplog.set_level(logging.DEBUG)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
cache_data(key, token_symbol)
interval = 1
max_retry = 3
resource_name = 'Active Token'
wait_for_cache(identifier, resource_name, MetadataPointer.TOKEN_ACTIVE, interval, max_retry)
assert f'Polling for resource: {resource_name} at: {key} every: {interval} second(s) for {max_retry} seconds.' in caplog.text
def test_wait_for_session_data(activated_account, caplog, generic_ussd_session):
caplog.set_level(logging.DEBUG)
generic_ussd_session.__delitem__('data')
interval = 1
max_retry = 3
collected_values = [None, None, None]
resource_name = 'Foo Data'
session_data_key = 'foo'
with pytest.raises(MaxRetryReached) as error:
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert str(error.value) == str(MaxRetryReached(collected_values, None))
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
generic_ussd_session['data'] = {}
with pytest.raises(MaxRetryReached) as error:
collected_values = [None, None, None]
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert str(error.value) == str(MaxRetryReached(collected_values, None))
expected_value = 'bar'
generic_ussd_session['data'] = {'foo': expected_value}
wait_for_session_data(resource_name, session_data_key, generic_ussd_session, interval, max_retry)
assert f'Data poller with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Session data poller for: {resource_name} with max retry at: {max_retry}. Checking for every: {interval} seconds.' in caplog.text
assert f'Resource: {expected_value} now available.' in caplog.text

View File

@ -10,13 +10,16 @@ from chainlib.hash import strip_0x
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.cache import cache_data, cache_data_key, get_cached_data
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import PersonMetadata
from cic_ussd.processor.ussd import get_menu, handle_menu, handle_menu_operations
from cic_ussd.processor.ussd import (get_menu,
handle_menu,
handle_menu_operations)
from cic_ussd.processor.util import ussd_menu_list
from cic_ussd.state_machine.logic.language import preferred_langauge_from_selection
from cic_ussd.translation import translation_for
# test imports
@ -43,7 +46,7 @@ def test_handle_menu(activated_account,
ussd_menu = UssdMenu.find_by_name('exit_pin_blocked')
assert menu_resp.get('name') == ussd_menu.get('name')
menu_resp = handle_menu(pending_account, init_database)
ussd_menu = UssdMenu.find_by_name('initial_language_selection')
ussd_menu = UssdMenu.find_by_name('initial_pin_entry')
assert menu_resp.get('name') == ussd_menu.get('name')
identifier = bytes.fromhex(strip_0x(pending_account.blockchain_address))
key = cache_data_key(identifier, MetadataPointer.PREFERENCES)
@ -75,38 +78,62 @@ def test_get_menu(activated_account,
assert menu_resp.get('name') == ussd_menu.get('name')
def test_handle_menu_operations(activated_account,
cache_preferences,
celery_session_worker,
generic_ussd_session,
init_database,
init_cache,
load_chain_spec,
load_config,
mock_account_creation_task_result,
persisted_ussd_session,
person_metadata,
set_locale_files,
setup_metadata_request_handler,
setup_metadata_signer,
task_uuid):
# sourcery skip: extract-duplicate-method
chain_str = Chain.spec.__str__()
def test_handle_no_account_menu_operations(celery_session_worker,
init_cache,
init_database,
load_chain_spec,
load_config,
load_languages,
load_ussd_menu,
mock_account_creation_task_result,
pending_account,
persisted_ussd_session,
set_locale_files,
task_uuid):
initial_language_selection = 'ussd.initial_language_selection'
phone = phone_number()
external_session_id = os.urandom(20).hex()
valid_service_codes = load_config.get('USSD_SERVICE_CODE').split(",")
preferred_language = i18n.config.get('fallback')
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '4444')
assert resp == translation_for('ussd.kenya.account_creation_prompt', preferred_language)
key = cache_data_key('system:languages'.encode('utf-8'), MetadataPointer.NONE)
cached_system_languages = get_cached_data(key)
language_list: list = json.loads(cached_system_languages)
fallback = translation_for('helpers.no_language_list', preferred_language)
language_list_sets = ussd_menu_list(fallback=fallback, menu_list=language_list, split=3)
resp = handle_menu_operations(external_session_id, phone, None, valid_service_codes[0], init_database, '')
assert resp == translation_for(initial_language_selection, preferred_language,
first_language_set=language_list_sets[0])
cached_ussd_session = get_cached_data(external_session_id)
ussd_session = json.loads(cached_ussd_session)
assert ussd_session['msisdn'] == phone
persisted_ussd_session.external_session_id = external_session_id
persisted_ussd_session.msisdn = phone
persisted_ussd_session.state = initial_language_selection[5:]
init_database.add(persisted_ussd_session)
init_database.commit()
account_creation_prompt = 'ussd.account_creation_prompt'
user_input = '2'
resp = handle_menu_operations(external_session_id, phone, None, valid_service_codes[0], init_database, user_input)
preferred_language = preferred_langauge_from_selection(user_input)
assert resp == translation_for(account_creation_prompt, preferred_language)
task_tracker = init_database.query(TaskTracker).filter_by(task_uuid=task_uuid).first()
assert task_tracker.task_uuid == task_uuid
cached_creation_task_uuid = get_cached_data(task_uuid)
creation_task_uuid_data = json.loads(cached_creation_task_uuid)
assert creation_task_uuid_data['status'] == 'PENDING'
def test_handle_account_menu_operations(activated_account,
cache_preferences,
celery_session_worker,
init_database,
load_config,
persisted_ussd_session,
person_metadata,
set_locale_files,
setup_metadata_request_handler,
setup_metadata_signer, ):
valid_service_codes = load_config.get('USSD_SERVICE_CODE').split(",")
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
person_metadata_client = PersonMetadata(identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
@ -117,6 +144,5 @@ def test_handle_menu_operations(activated_account,
phone = activated_account.phone_number
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
persisted_ussd_session.state = 'enter_transaction_recipient'
resp = handle_menu_operations(chain_str, external_session_id, phone, None, valid_service_codes[0], init_database, '1')
assert resp == translation_for('ussd.kenya.enter_transaction_recipient', preferred_language)
resp = handle_menu_operations(external_session_id, phone, None, valid_service_codes[0], init_database, '1')
assert resp == translation_for('ussd.enter_transaction_recipient', preferred_language)

View File

@ -10,7 +10,10 @@ from cic_types.models.person import get_contact_data_from_vcard
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.metadata import PersonMetadata
from cic_ussd.processor.util import latest_input, parse_person_metadata, resume_last_ussd_session
from cic_ussd.processor.util import (latest_input,
parse_person_metadata,
resume_last_ussd_session,
ussd_menu_list)
from cic_ussd.translation import translation_for
@ -32,7 +35,7 @@ def test_parse_person_metadata(activated_account, cache_person_metadata, cache_p
cached_person_metadata = person_metadata.get_cached_metadata()
person_metadata = json.loads(cached_person_metadata)
preferred_language = get_cached_preferred_language(activated_account.blockchain_address)
display_key = 'ussd.kenya.display_person_metadata'
display_key = 'ussd.display_person_metadata'
parsed_person_metadata = parse_person_metadata(cached_person_metadata,
display_key,
preferred_language)
@ -60,3 +63,20 @@ def test_parse_person_metadata(activated_account, cache_person_metadata, cache_p
])
def test_resume_last_ussd_session(expected_menu_name, last_state, load_ussd_menu):
assert resume_last_ussd_session(last_state).get('name') == expected_menu_name
def test_ussd_menu_list(activated_account, cache_preferences, load_ussd_menu, set_locale_files):
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
fallback = translation_for('helpers.no_transaction_history', preferred_language)
menu_list_sets = ['1. FII 50.0', '2. GFT 60.0', '3. DET 49.99']
split = 3
menu_list = ussd_menu_list(fallback=fallback, menu_list=menu_list_sets, split=split)
menu_list_sets = [menu_list_sets[item:item + split] for item in range(0, len(menu_list), split)]
menu_list_reprs = []
for i in range(split):
try:
menu_list_reprs.append(''.join(f'{list_set_item}\n' for list_set_item in menu_list_sets[i]).rstrip('\n'))
except IndexError:
menu_list_reprs.append(fallback)
assert menu_list == menu_list_reprs

View File

@ -3,8 +3,7 @@ import json
# external imports
import pytest
import requests_mock
from chainlib.hash import strip_0x
from cic_types.models.person import Person, get_contact_data_from_vcard
# local imports
@ -12,9 +11,7 @@ from cic_ussd.cache import get_cached_data
from cic_ussd.account.maps import gender
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.db.enum import AccountStatus
from cic_ussd.metadata import PreferencesMetadata
from cic_ussd.state_machine.logic.account import (change_preferred_language,
edit_user_metadata_attribute,
from cic_ussd.state_machine.logic.account import (edit_user_metadata_attribute,
parse_gender,
parse_person_metadata,
save_complete_person_metadata,
@ -26,32 +23,6 @@ from cic_ussd.translation import translation_for
# test imports
@pytest.mark.parametrize('user_input, expected_preferred_language', [
('1', 'en'),
('2', 'sw')
])
def test_change_preferred_language(activated_account,
celery_session_worker,
expected_preferred_language,
init_database,
generic_ussd_session,
mock_response,
preferences,
setup_metadata_request_handler,
user_input):
identifier = bytes.fromhex(strip_0x(activated_account.blockchain_address))
preferences_metadata_client = PreferencesMetadata(identifier)
with requests_mock.Mocker(real_http=False) as requests_mocker:
requests_mocker.register_uri(
'POST', preferences_metadata_client.url, status_code=200, reason='OK', json=mock_response
)
state_machine_data = (user_input, generic_ussd_session, activated_account, init_database)
res = change_preferred_language(state_machine_data)
init_database.commit()
assert res.id is not None
assert activated_account.preferred_language == expected_preferred_language
@pytest.mark.parametrize('user_input', [
'1',
'2',

View File

@ -0,0 +1,52 @@
# standard imports
import json
# external imports
import requests_mock
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.metadata import PreferencesMetadata
from cic_ussd.state_machine.logic.language import (change_preferred_language,
is_valid_language_selection,
preferred_langauge_from_selection,
process_language_selection)
# test imports
def test_change_preferred_language(activated_account,
cached_ussd_session,
celery_session_worker,
init_database,
load_languages,
mocker,
setup_metadata_signer,
setup_metadata_request_handler):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
preferences = {
'preferred_language': 'en'
}
ussd_session['data'] = preferences
mock_add_preferences_metadata = mocker.patch('cic_ussd.tasks.metadata.add_preferences_metadata.apply_async')
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = bytes.fromhex(activated_account.blockchain_address)
metadata_client = PreferencesMetadata(identifier=identifier)
request_mocker.register_uri('POST', metadata_client.url, status_code=201, reason='CREATED', json=preferences)
state_machine_data = ('1', ussd_session, activated_account, init_database)
change_preferred_language(state_machine_data)
mock_add_preferences_metadata.assert_called_with(
(activated_account.blockchain_address, preferences), {}, queue='cic-ussd')
def test_is_valid_language_selection(activated_account,
generic_ussd_session,
init_cache,
init_database,
load_languages):
state_machine_data = ('1', generic_ussd_session, activated_account, init_database)
assert is_valid_language_selection(state_machine_data) is True
state_machine_data = ('12', generic_ussd_session, activated_account, init_database)
assert is_valid_language_selection(state_machine_data) is False

View File

@ -9,7 +9,10 @@ from cic_ussd.state_machine.logic.menu import (menu_one_selected,
menu_four_selected,
menu_five_selected,
menu_six_selected,
menu_nine_selected,
menu_zero_zero_selected,
menu_eleven_selected,
menu_twenty_two_selected,
menu_ninety_nine_selected)
# test imports
@ -29,8 +32,14 @@ def test_menu_selection(init_database, pending_account, persisted_ussd_session):
assert menu_five_selected(('e', ussd_session, pending_account, init_database)) is False
assert menu_six_selected(('6', ussd_session, pending_account, init_database)) is True
assert menu_six_selected(('8', ussd_session, pending_account, init_database)) is False
assert menu_nine_selected(('9', ussd_session, pending_account, init_database)) is True
assert menu_nine_selected(('-', ussd_session, pending_account, init_database)) is False
assert menu_zero_zero_selected(('00', ussd_session, pending_account, init_database)) is True
assert menu_zero_zero_selected(('/', ussd_session, pending_account, init_database)) is False
assert menu_eleven_selected(('11', ussd_session, pending_account, init_database)) is True
assert menu_eleven_selected(('*', ussd_session, pending_account, init_database)) is False
assert menu_twenty_two_selected(('22', ussd_session, pending_account, init_database)) is True
assert menu_twenty_two_selected(('5', ussd_session, pending_account, init_database)) is False
assert menu_ninety_nine_selected(('99', ussd_session, pending_account, init_database)) is True
assert menu_ninety_nine_selected(('d', ussd_session, pending_account, init_database)) is False

View File

@ -0,0 +1,221 @@
# standard imports
import json
# external imports
import requests_mock
# local imports
from cic_ussd.account.guardianship import Guardianship
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import PersonMetadata
from cic_ussd.state_machine.logic.pin_guard import (add_pin_guardian,
is_dialers_pin_guardian,
is_others_pin_guardian,
is_set_pin_guardian,
remove_pin_guardian,
initiate_pin_reset,
save_guardian_to_session_data,
save_guarded_account_session_data,
retrieve_person_metadata,
is_valid_guardian_addition)
from cic_ussd.translation import translation_for
def test_save_guardian_to_session_data(activated_account,
cached_ussd_session,
celery_session_worker,
guardian_account,
init_cache,
init_database):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['msisdn'] = activated_account.phone_number
state_machine_data = (guardian_account.phone_number, ussd_session, activated_account, init_database)
save_guardian_to_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data').get('guardian_phone_number') == guardian_account.phone_number
def test_save_guarded_account_session_data(activated_account,
cached_ussd_session,
celery_session_worker,
guardian_account,
init_cache,
init_database):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['msisdn'] = guardian_account.phone_number
state_machine_data = (activated_account.phone_number, ussd_session, guardian_account, init_database)
save_guarded_account_session_data(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data').get('guarded_account_phone_number') == activated_account.phone_number
def test_retrieve_person_metadata(activated_account,
cached_ussd_session,
celery_session_worker,
guardian_account,
init_cache,
init_database,
mocker,
person_metadata,
setup_metadata_request_handler,
setup_metadata_signer):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['msisdn'] = activated_account.phone_number
state_machine_data = (guardian_account.phone_number, ussd_session, activated_account, init_database)
mocker_query_person_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = bytes.fromhex(activated_account.blockchain_address)
metadata_client = PersonMetadata(identifier)
request_mocker.register_uri('GET', metadata_client.url, json=person_metadata, reason='OK', status_code=200)
retrieve_person_metadata(state_machine_data)
mocker_query_person_metadata.assert_called_with((guardian_account.blockchain_address,), {}, queue='cic-ussd')
def test_is_valid_guardian_addition(activated_account,
cache_preferences,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
guardian_account,
load_languages,
load_ussd_menu,
set_locale_files,
setup_guardianship):
blockchain_address = activated_account.blockchain_address
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
state_machine_data = (guardian_account.phone_number, ussd_session, activated_account, init_database)
assert is_valid_guardian_addition(state_machine_data) is True
state_machine_data = (activated_account.phone_number, ussd_session, activated_account, init_database)
assert is_valid_guardian_addition(state_machine_data) is False
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
preferred_language = get_cached_preferred_language(blockchain_address)
failure_reason = translation_for('helpers.error.is_initiator', preferred_language)
assert ussd_session.get('data').get('failure_reason') == failure_reason
state_machine_data = (Guardianship.guardians[0], ussd_session, activated_account, init_database)
assert is_valid_guardian_addition(state_machine_data) is False
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
preferred_language = get_cached_preferred_language(blockchain_address)
failure_reason = translation_for('helpers.error.is_existent_guardian', preferred_language)
assert ussd_session.get('data').get('failure_reason') == failure_reason
def test_add_pin_guardian(activated_account, generic_ussd_session, guardian_account, init_database):
generic_ussd_session['data'] = {'guardian_phone_number': guardian_account.phone_number}
state_machine_data = ('', generic_ussd_session, activated_account, init_database)
add_pin_guardian(state_machine_data)
account = Account.get_by_phone_number(activated_account.phone_number, init_database)
assert account.get_guardians()[0] == guardian_account.phone_number
def test_is_set_pin_guardian(activated_account,
cache_preferences,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database,
guardian_account,
load_languages,
load_ussd_menu,
set_locale_files,
setup_guardianship):
blockchain_address = activated_account.blockchain_address
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
preferred_language = get_cached_preferred_language(blockchain_address)
assert is_set_pin_guardian(activated_account, guardian_account.phone_number, preferred_language, init_database,
ussd_session) is False
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
failure_reason = translation_for('helpers.error.is_not_existent_guardian', preferred_language)
assert ussd_session.get('data').get('failure_reason') == failure_reason
assert is_set_pin_guardian(activated_account, Guardianship.guardians[0], preferred_language, init_database,
ussd_session) is True
assert is_set_pin_guardian(activated_account, activated_account.phone_number, preferred_language, init_database,
ussd_session) is False
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
failure_reason = translation_for('helpers.error.is_initiator', preferred_language)
assert ussd_session.get('data').get('failure_reason') == failure_reason
def test_is_dialers_pin_guardian(activated_account,
cache_preferences,
cached_ussd_session,
celery_session_worker,
init_database,
guardian_account):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
state_machine_data = (guardian_account.phone_number, ussd_session, activated_account, init_database)
assert is_dialers_pin_guardian(state_machine_data) is False
activated_account.add_guardian(guardian_account.phone_number)
init_database.flush()
state_machine_data = (guardian_account.phone_number, ussd_session, activated_account, init_database)
assert is_dialers_pin_guardian(state_machine_data) is True
def test_is_others_pin_guardian(activated_account,
cache_preferences,
cached_ussd_session,
celery_session_worker,
init_database,
guardian_account):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
state_machine_data = (activated_account.phone_number, ussd_session, guardian_account, init_database)
assert is_others_pin_guardian(state_machine_data) is False
activated_account.add_guardian(guardian_account.phone_number)
init_database.flush()
state_machine_data = (activated_account.phone_number, ussd_session, guardian_account, init_database)
assert is_others_pin_guardian(state_machine_data) is True
def test_remove_pin_guardian(activated_account, generic_ussd_session, guardian_account, init_database):
generic_ussd_session['data'] = {'guardian_phone_number': guardian_account.phone_number}
activated_account.add_guardian(guardian_account.phone_number)
init_database.flush()
assert activated_account.get_guardians()[0] == guardian_account.phone_number
state_machine_data = ('', generic_ussd_session, activated_account, init_database)
remove_pin_guardian(state_machine_data)
assert len(activated_account.get_guardians()) == 0
def test_initiate_pin_reset(activated_account,
cache_preferences,
celery_session_worker,
cached_ussd_session,
guardian_account,
init_cache,
init_database,
load_ussd_menu,
mock_notifier_api,
set_locale_files):
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['data'] = {'guarded_account_phone_number': activated_account.phone_number}
state_machine_data = ('', ussd_session, guardian_account, init_database)
initiate_pin_reset(state_machine_data)
blockchain_address = activated_account.blockchain_address
preferred_language = get_cached_preferred_language(blockchain_address)
message = translation_for('sms.pin_reset_initiated', preferred_language, pin_initiator=guardian_account.standard_metadata_id())
assert mock_notifier_api.get('message') == message
assert mock_notifier_api.get('recipient') == activated_account.phone_number

View File

@ -23,6 +23,7 @@ def test_upsell_unregistered_recipient(activated_account,
load_support_phone,
mock_notifier_api,
set_locale_files,
set_active_token,
valid_recipient):
cached_ussd_session.set_data('recipient_phone_number', valid_recipient.phone_number)
state_machine_data = ('', cached_ussd_session.to_json(), activated_account, init_database)

View File

@ -0,0 +1,69 @@
# standard imports
import json
# external imports
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.state_machine.logic.tokens import (is_valid_token_selection,
process_token_selection,
set_selected_active_token)
from cic_ussd.account.tokens import get_cached_token_data_list
# test imports
def test_is_valid_token_selection(activated_account,
cache_token_data_list,
cache_token_symbol_list,
cached_ussd_session,
init_cache,
init_database):
cached_token_data_list = get_cached_token_data_list(activated_account.blockchain_address)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['data'] = {'account_tokens_list': cached_token_data_list}
state_machine_data = ('GFT', ussd_session, activated_account, init_database)
assert is_valid_token_selection(state_machine_data) is True
state_machine_data = ('1', ussd_session, activated_account, init_database)
assert is_valid_token_selection(state_machine_data) is True
state_machine_data = ('3', ussd_session, activated_account, init_database)
assert is_valid_token_selection(state_machine_data) is False
def test_process_token_selection(activated_account,
cache_token_data_list,
cache_token_symbol_list,
cached_ussd_session,
celery_session_worker,
init_cache,
init_database):
cached_token_data_list = get_cached_token_data_list(activated_account.blockchain_address)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['data'] = {'account_tokens_list': cached_token_data_list}
state_machine_data = ('GFT', ussd_session, activated_account, init_database)
process_token_selection(state_machine_data)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
assert ussd_session.get('data').get('selected_token').get('symbol') == 'GFT'
def test_set_selected_active_token(activated_account,
cache_token_data_list,
cache_token_symbol_list,
cached_ussd_session,
init_cache,
init_database):
cached_token_data_list = get_cached_token_data_list(activated_account.blockchain_address)
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
ussd_session = json.loads(ussd_session)
ussd_session['data'] = {'selected_token': cached_token_data_list[0]}
state_machine_data = ('GFT', ussd_session, activated_account, init_database)
set_selected_active_token(state_machine_data)
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_ACTIVE)
active_token = get_cached_data(key)
assert active_token == 'GFT'

View File

@ -3,13 +3,12 @@ import json
# external imports
import pytest
import requests_mock
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.tokens import get_active_token_symbol, get_cached_token_data
from cic_ussd.account.transaction import to_wei
from cic_ussd.cache import get_cached_data
from cic_ussd.metadata import PersonMetadata
from cic_ussd.state_machine.logic.transaction import (is_valid_recipient,
is_valid_transaction_amount,
has_sufficient_balance,
@ -18,7 +17,6 @@ from cic_ussd.state_machine.logic.transaction import (is_valid_recipient,
save_recipient_phone_to_session_data,
save_transaction_amount_to_session_data)
# test imports
@ -49,17 +47,18 @@ def test_is_valid_transaction_amount(activated_account, amount, expected_result,
])
def test_has_sufficient_balance(activated_account,
cache_balances,
cache_default_token_data,
cache_token_data,
expected_result,
generic_ussd_session,
init_database,
set_active_token,
value):
state_machine_data = (value, generic_ussd_session, activated_account, init_database)
assert has_sufficient_balance(state_machine_data=state_machine_data) == expected_result
def test_process_transaction_request(activated_account,
cache_default_token_data,
cache_token_data,
cached_ussd_session,
celery_session_worker,
init_cache,
@ -67,7 +66,12 @@ def test_process_transaction_request(activated_account,
load_chain_spec,
load_config,
mock_transfer_api,
set_active_token,
valid_recipient):
blockchain_address = activated_account.blockchain_address
token_symbol = get_active_token_symbol(blockchain_address)
token_data = get_cached_token_data(blockchain_address, token_symbol)
decimals = token_data.get("decimals")
cached_ussd_session.set_data('recipient_phone_number', valid_recipient.phone_number)
cached_ussd_session.set_data('transaction_amount', '50')
ussd_session = get_cached_data(cached_ussd_session.external_session_id)
@ -76,7 +80,7 @@ def test_process_transaction_request(activated_account,
process_transaction_request(state_machine_data)
assert mock_transfer_api['from_address'] == activated_account.blockchain_address
assert mock_transfer_api['to_address'] == valid_recipient.blockchain_address
assert mock_transfer_api['value'] == to_wei(50)
assert mock_transfer_api['value'] == to_wei(decimals, 50)
assert mock_transfer_api['token_symbol'] == load_config.get('TEST_TOKEN_SYMBOL')

View File

@ -6,8 +6,10 @@ def test_state_machine(activated_account_ussd_session,
celery_session_worker,
init_database,
init_state_machine,
pending_account):
load_languages,
pending_account,
set_locale_files):
state_machine = UssdStateMachine(activated_account_ussd_session)
state_machine.scan_data(('1', activated_account_ussd_session, pending_account, init_database))
assert state_machine.__repr__() == f'<KenyaUssdStateMachine: {state_machine.state}>'
assert state_machine.state == 'initial_pin_entry'
assert state_machine.state == 'account_creation_prompt'

View File

@ -4,15 +4,18 @@ import json
# external imports
import celery
import pytest
import requests_mock
from chainlib.hash import strip_0x
from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.statement import filter_statement_transactions
from cic_ussd.account.tokens import collate_token_metadata
from cic_ussd.account.transaction import transaction_actors
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.db.models.account import Account
from cic_ussd.error import AccountCreationDataNotFound
from cic_ussd.metadata import TokenMetadata
# test imports
@ -22,11 +25,13 @@ from tests.helpers.accounts import blockchain_address
def test_account_creation_callback(account_creation_data,
cache_account_creation_data,
celery_session_worker,
cache_default_token_data,
custom_metadata,
init_cache,
init_database,
load_chain_spec,
mocker,
preferences,
setup_metadata_request_handler,
setup_metadata_signer):
phone_number = account_creation_data.get('phone_number')
@ -48,10 +53,12 @@ def test_account_creation_callback(account_creation_data,
cached_account_creation_data = get_cached_data(task_uuid)
cached_account_creation_data = json.loads(cached_account_creation_data)
assert cached_account_creation_data.get('status') == account_creation_data.get('status')
mock_add_preferences_metadata = mocker.patch('cic_ussd.tasks.metadata.add_preferences_metadata.apply_async')
mock_add_phone_pointer = mocker.patch('cic_ussd.tasks.metadata.add_phone_pointer.apply_async')
mock_add_custom_metadata = mocker.patch('cic_ussd.tasks.metadata.add_custom_metadata.apply_async')
preferred_language = preferences.get('preferred_language')
s_account_creation_callback = celery.signature(
'cic_ussd.tasks.callback_handler.account_creation_callback', [result, '', 0]
'cic_ussd.tasks.callback_handler.account_creation_callback', [result, preferred_language, 0]
)
s_account_creation_callback.apply_async().get()
account = init_database.query(Account).filter_by(phone_number=phone_number).first()
@ -59,6 +66,7 @@ def test_account_creation_callback(account_creation_data,
cached_account_creation_data = get_cached_data(task_uuid)
cached_account_creation_data = json.loads(cached_account_creation_data)
assert cached_account_creation_data.get('status') == 'CREATED'
mock_add_preferences_metadata.assert_called_with((result, preferences), {}, queue='cic-ussd')
mock_add_phone_pointer.assert_called_with((result, phone_number), {}, queue='cic-ussd')
mock_add_custom_metadata.assert_called_with((result, custom_metadata), {}, queue='cic-ussd')
@ -117,12 +125,46 @@ def test_statement_callback(activated_account, mocker, transactions_list):
(activated_account.blockchain_address, sender_transaction), {}, queue='cic-ussd')
def test_token_data_callback(activated_account,
cache_token_data,
cache_token_meta_symbol,
cache_token_proof_symbol,
celery_session_worker,
default_token_data,
init_cache,
token_meta_symbol,
token_symbol):
blockchain_address = activated_account.blockchain_address
identifier = token_symbol.encode('utf-8')
status_code = 1
with pytest.raises(ValueError) as error:
s_token_data_callback = celery.signature(
'cic_ussd.tasks.callback_handler.token_data_callback',
[[default_token_data], blockchain_address, status_code])
s_token_data_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
token_data_key = cache_data_key([bytes.fromhex(blockchain_address), identifier], MetadataPointer.TOKEN_DATA)
token_meta_key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
token_info_key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
token_meta = get_cached_data(token_meta_key)
token_meta = json.loads(token_meta)
token_info = get_cached_data(token_info_key)
token_info = json.loads(token_info)
token_data = collate_token_metadata(token_info=token_info, token_metadata=token_meta)
token_data = {**token_data, **default_token_data}
cached_token_data = json.loads(get_cached_data(token_data_key))
for key, value in token_data.items():
assert token_data[key] == cached_token_data[key]
def test_transaction_balances_callback(activated_account,
balances,
cache_balances,
cache_default_token_data,
cache_token_data,
cache_person_metadata,
cache_preferences,
celery_session_worker,
load_chain_spec,
mocker,
preferences,
@ -157,7 +199,16 @@ def test_transaction_balances_callback(activated_account,
mocked_chain.assert_called()
def test_transaction_callback(load_chain_spec, mock_async_balance_api_query, transaction_result):
def test_transaction_callback(cache_token_data,
celery_session_worker,
default_token_data,
init_cache,
load_chain_spec,
mock_async_balance_api_query,
token_symbol,
token_meta_symbol,
token_proof_symbol,
transaction_result):
status_code = 1
with pytest.raises(ValueError) as error:
s_transaction_callback = celery.signature(
@ -166,13 +217,19 @@ def test_transaction_callback(load_chain_spec, mock_async_balance_api_query, tra
s_transaction_callback.apply_async().get()
assert str(error.value) == f'Unexpected status code: {status_code}.'
status_code = 0
s_transaction_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_callback',
[transaction_result, 'transfer', status_code])
s_transaction_callback.apply_async().get()
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
assert mock_async_balance_api_query.get('address') == recipient_transaction.get('blockchain_address') or sender_transaction.get('blockchain_address')
assert mock_async_balance_api_query.get('token_symbol') == recipient_transaction.get('token_symbol') or sender_transaction.get('token_symbol')
with requests_mock.Mocker(real_http=False) as request_mocker:
identifier = token_symbol.encode('utf-8')
metadata_client = TokenMetadata(identifier, cic_type=MetadataPointer.TOKEN_META_SYMBOL)
request_mocker.register_uri('GET', metadata_client.url, json=token_meta_symbol, status_code=200, reason='OK')
metadata_client = TokenMetadata(identifier, cic_type=MetadataPointer.TOKEN_PROOF_SYMBOL)
request_mocker.register_uri('GET', metadata_client.url, json=token_proof_symbol, status_code=200, reason='OK')
status_code = 0
s_transaction_callback = celery.signature(
'cic_ussd.tasks.callback_handler.transaction_callback',
[transaction_result, 'transfer', status_code])
s_transaction_callback.apply_async().get()
recipient_transaction, sender_transaction = transaction_actors(transaction_result)
assert mock_async_balance_api_query.get('address') == recipient_transaction.get('blockchain_address') or sender_transaction.get('blockchain_address')
assert mock_async_balance_api_query.get('token_symbol') == recipient_transaction.get('token_symbol') or sender_transaction.get('token_symbol')

View File

@ -14,13 +14,14 @@ from cic_ussd.translation import translation_for
def test_transaction(cache_default_token_data,
cache_token_data,
celery_session_worker,
load_support_phone,
mock_notifier_api,
notification_data,
set_locale_files):
notification_data['transaction_type'] = 'transfer'
amount = from_wei(notification_data.get('token_value'))
amount = from_wei(6, notification_data.get('token_value'))
balance = notification_data.get('available_balance')
phone_number = notification_data.get('phone_number')
preferred_language = notification_data.get('preferred_language')

View File

@ -52,6 +52,11 @@ def test_cache_statement(activated_account,
cached_statement = get_cached_data(key)
cached_statement = json.loads(cached_statement)
assert len(cached_statement) == 1
sender_transaction['token_value'] = 60.0
s_parse_transaction = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [sender_transaction])
result = s_parse_transaction.apply_async().get()
s_cache_statement = celery.signature(
'cic_ussd.tasks.processor.cache_statement', [result, activated_account.blockchain_address]
)

View File

@ -8,8 +8,8 @@ from cic_ussd.notifications import Notifier
@pytest.mark.parametrize("key, preferred_language, recipient, expected_message", [
("ussd.kenya.exit", "en", "+254712345678", "END Thank you for using the service."),
("ussd.kenya.exit", "sw", "+254712345678", "END Asante kwa kutumia huduma.")
("ussd.exit", "en", "+254712345678", "END Thank you for using the service."),
("ussd.exit", "sw", "+254712345678", "END Asante kwa kutumia huduma")
])
def test_send_sms_notification(celery_session_worker,
expected_message,

View File

@ -10,12 +10,12 @@ from cic_ussd.translation import translation_for
def test_translation_for(set_locale_files):
english_translation = translation_for(
key='ussd.kenya.exit_invalid_request',
key='ussd.exit_invalid_request',
preferred_language='en'
)
swahili_translation = translation_for(
key='ussd.kenya.exit_invalid_request',
key='ussd.exit_invalid_request',
preferred_language='sw'
)
assert swahili_translation == 'END Chaguo si sahihi.'
assert swahili_translation == 'END Chaguo si sahihi'
assert english_translation == 'END Invalid request.'

View File

@ -8,6 +8,7 @@ from cic_types.condiments import MetadataPointer
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import set_active_token
from cic_ussd.cache import cache_data, cache_data_key
from cic_ussd.db.enum import AccountStatus
from cic_ussd.db.models.account import Account
@ -36,6 +37,16 @@ def activated_account(init_database, set_fernet_key):
return account
@pytest.fixture(scope='function')
def guardian_account(init_database, set_fernet_key):
account = Account(blockchain_address(), phone_number())
account.create_password('0000')
account.activate_account()
init_database.add(account)
init_database.commit()
return account
@pytest.fixture(scope='function')
def balances():
return [{
@ -53,13 +64,22 @@ def cache_account_creation_data(init_cache, account_creation_data):
@pytest.fixture(scope='function')
def cache_balances(activated_account, balances, init_cache):
identifier = bytes.fromhex(activated_account.blockchain_address)
def cache_balances(activated_account, balances, init_cache, token_symbol):
identifier = [bytes.fromhex(activated_account.blockchain_address), token_symbol.encode('utf-8')]
balances = json.dumps(balances[0])
key = cache_data_key(identifier, MetadataPointer.BALANCES)
cache_data(key, balances)
@pytest.fixture(scope='function')
def cache_adjusted_balances(activated_account, balances, init_cache, token_symbol):
identifier = bytes.fromhex(activated_account.blockchain_address)
balances_identifier = [identifier, token_symbol.encode('utf-8')]
key = cache_data_key(balances_identifier, MetadataPointer.BALANCES_ADJUSTED)
adjusted_balance = 45931650.64654012
cache_data(key, adjusted_balance)
@pytest.fixture(scope='function')
def cache_default_token_data(default_token_data, init_cache, load_chain_spec):
chain_str = Chain.spec.__str__()
@ -68,6 +88,113 @@ def cache_default_token_data(default_token_data, init_cache, load_chain_spec):
cache_data(key, data)
@pytest.fixture(scope='function')
def set_active_token(activated_account, init_cache, token_symbol):
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_ACTIVE)
cache_data(key=key, data=token_symbol)
@pytest.fixture(scope='function')
def cache_token_data(activated_account, init_cache, token_data):
identifier = [bytes.fromhex(activated_account.blockchain_address), token_data.get('symbol').encode('utf-8')]
key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA)
cache_data(key=key, data=json.dumps(token_data))
@pytest.fixture(scope='function')
def cache_token_symbol_list(activated_account, init_cache, token_symbol):
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier=identifier, salt=MetadataPointer.TOKEN_SYMBOLS_LIST)
token_symbols_list = [token_symbol]
cache_data(key, json.dumps(token_symbols_list))
@pytest.fixture(scope='function')
def cache_token_data_list(activated_account, init_cache, token_data):
identifier = bytes.fromhex(activated_account.blockchain_address)
key = cache_data_key(identifier, MetadataPointer.TOKEN_DATA_LIST)
token_data_list = [token_data]
cache_data(key, json.dumps(token_data_list))
@pytest.fixture(scope='function')
def token_meta_symbol():
return {
"contact": {
"phone": "+254700000000",
"email": "info@grassrootseconomics.org"
},
"country_code": "KE",
"location": "Kilifi",
"name": "GRASSROOTS ECONOMICS"
}
@pytest.fixture(scope='function')
def token_proof_symbol():
return {
"description": "Community support",
"issuer": "Grassroots Economics",
"namespace": "ge",
"proofs": [
"0x4746540000000000000000000000000000000000000000000000000000000000",
"1f0f0e3e9db80eeaba22a9d4598e454be885855d6048545546fd488bb709dc2f"
],
"version": 0
}
@pytest.fixture(scope='function')
def token_list_entries():
return [
{
'name': 'Fee',
'symbol': 'FII',
'issuer': 'Foo',
'contact': {'phone': '+254712345678'},
'location': 'Fum',
'balance': 50.0
},
{
'name': 'Giftable Token',
'symbol': 'GFT',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'
},
'location': 'Fum',
'balance': 60.0
},
{
'name': 'Demurrage Token',
'symbol': 'DET',
'issuer': 'Grassroots Economics',
'contact': {
'phone': '+254700000000',
'email': 'info@grassrootseconomics.org'
},
'location': 'Fum',
'balance': 49.99
}
]
@pytest.fixture(scope='function')
def cache_token_meta_symbol(token_meta_symbol, token_symbol):
identifier = token_symbol.encode('utf-8')
key = cache_data_key(identifier, MetadataPointer.TOKEN_META_SYMBOL)
cache_data(key, json.dumps(token_meta_symbol))
@pytest.fixture(scope='function')
def cache_token_proof_symbol(token_proof_symbol, token_symbol):
identifier = token_symbol.encode('utf-8')
key = cache_data_key(identifier, MetadataPointer.TOKEN_PROOF_SYMBOL)
cache_data(key, json.dumps(token_proof_symbol))
@pytest.fixture(scope='function')
def cache_person_metadata(activated_account, init_cache, person_metadata):
identifier = bytes.fromhex(activated_account.blockchain_address)
@ -100,10 +227,33 @@ def custom_metadata():
@pytest.fixture(scope='function')
def default_token_data(token_symbol):
return {
'symbol': token_symbol,
'address': blockchain_address(),
'name': 'Giftable',
'decimals': 6
'symbol': token_symbol,
'address': '32e860c2a0645d1b7b005273696905f5d6dc5d05',
'name': 'Giftable Token',
'decimals': 6,
"converters": []
}
@pytest.fixture(scope='function')
def token_data():
return {
"description": "Community support",
"issuer": "Grassroots Economics",
"location": "Kilifi",
"contact": {
"phone": "+254700000000",
"email": "info@grassrootseconomics.org"
},
"decimals": 6,
"name": "Giftable Token",
"symbol": "GFT",
"address": "32e860c2a0645d1b7b005273696905f5d6dc5d05",
"proofs": [
"0x4746540000000000000000000000000000000000000000000000000000000000",
"1f0f0e3e9db80eeaba22a9d4598e454be885855d6048545546fd488bb709dc2f"
],
"converters": []
}

View File

@ -2,14 +2,18 @@
# external imports
import pytest
from pytest_redis import factories
# local imports
from cic_ussd.cache import Cache
from cic_ussd.session.ussd_session import UssdSession
redis_test_proc = factories.redis_proc()
redis_db = factories.redisdb('redis_test_proc', decode=True)
@pytest.fixture(scope='function')
def init_cache(redisdb):
Cache.store = redisdb
UssdSession.store = redisdb
return redisdb
def init_cache(redis_db):
Cache.store = redis_db
UssdSession.store = redis_db
return redis_db

View File

@ -10,11 +10,13 @@ from confini import Config
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.guardianship import Guardianship
from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.phone_number import E164Format, Support
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.translation import generate_locale_files, Languages
from cic_ussd.validator import validate_presence
logg = logging.getLogger(__name__)
@ -39,6 +41,14 @@ def init_state_machine(load_config):
UssdStateMachine.transitions = json_file_parser(filepath=load_config.get('MACHINE_TRANSITIONS'))
@pytest.fixture(scope='function')
def load_languages(init_cache, load_config):
validate_presence(load_config.get('LANGUAGES_FILE'))
Languages.load_languages_dict(load_config.get('LANGUAGES_FILE'))
languages = Languages()
languages.cache_system_languages()
@pytest.fixture(scope='function')
def load_chain_spec(load_config):
chain_spec = ChainSpec.from_chain_str(load_config.get('CHAIN_SPEC'))
@ -75,8 +85,23 @@ def set_fernet_key(load_config):
PasswordEncoder.set_key(load_config.get('APP_PASSWORD_PEPPER'))
@pytest.fixture
def set_locale_files(load_config):
validate_presence(load_config.get('LOCALE_PATH'))
i18n.load_path.append(load_config.get('LOCALE_PATH'))
@pytest.fixture(scope='function')
def setup_guardianship(load_config):
guardians_file = os.path.join(root_directory, load_config.get('SYSTEM_GUARDIANS_FILE'))
validate_presence(guardians_file)
Guardianship.load_system_guardians(guardians_file)
@pytest.fixture(scope="session")
def set_locale_files(load_config, tmpdir_factory):
tmpdir = tmpdir_factory.mktemp("var")
tmpdir_path = str(tmpdir)
validate_presence(tmpdir_path)
import cic_translations
package_path = cic_translations.__path__
schema_files = os.path.join(package_path[0], load_config.get("SCHEMA_FILE_PATH"))
generate_locale_files(locale_dir=tmpdir_path,
schema_file_path=schema_files,
translation_builder_path=load_config.get('LOCALE_FILE_BUILDERS'))
i18n.load_path.append(tmpdir_path)
i18n.set('fallback', load_config.get('LOCALE_FALLBACK'))

View File

@ -40,6 +40,7 @@ def statement(activated_account):
'blockchain_address': activated_account.blockchain_address,
'token_symbol': 'GFT',
'token_value': 25000000,
'token_decimals': 6,
'role': 'sender',
'action_tag': 'Sent',
'direction_tag': 'To',
@ -63,7 +64,7 @@ def transaction_result(activated_account, load_config, valid_recipient):
'destination_token_symbol': load_config.get('TEST_TOKEN_SYMBOL'),
'source_token_decimals': 6,
'destination_token_decimals': 6,
'chain': 'evm:bloxberg:8996'
'chain': load_config.get('CHAIN_SPEC')
}

View File

@ -1,21 +1,142 @@
[
{
"trigger": "scan_data",
"source": "select_preferred_language",
"source": "initial_language_selection",
"dest": "account_creation_prompt",
"after": "cic_ussd.state_machine.logic.account.process_account_creation",
"conditions": "cic_ussd.state_machine.logic.language.is_valid_language_selection"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "initial_middle_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_eleven_selected"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "exit",
"after": "cic_ussd.state_machine.logic.account.change_preferred_language",
"conditions": "cic_ussd.state_machine.logic.menu.menu_one_selected"
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "initial_middle_language_set",
"dest": "initial_language_selection",
"conditions": "cic_ussd.state_machine.logic.menu.menu_twenty_two_selected"
},
{
"trigger": "scan_data",
"source": "initial_middle_language_set",
"dest": "initial_last_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_eleven_selected"
},
{
"trigger": "scan_data",
"source": "initial_middle_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "initial_last_language_set",
"dest": "initial_middle_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_twenty_two_selected"
},
{
"trigger": "scan_data",
"source": "initial_last_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "initial_middle_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "last_language_set",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "select_preferred_language",
"dest": "exit",
"after": "cic_ussd.state_machine.logic.account.change_preferred_language",
"conditions": "cic_ussd.state_machine.logic.menu.menu_two_selected"
"after": "cic_ussd.state_machine.logic.language.change_preferred_language",
"conditions": "cic_ussd.state_machine.logic.language.is_valid_language_selection"
},
{
"trigger": "scan_data",
"source": "select_preferred_language",
"dest": "middle_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_eleven_selected"
},
{
"trigger": "scan_data",
"source": "select_preferred_language",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "select_preferred_language",
"conditions": "cic_ussd.state_machine.logic.menu.menu_twenty_two_selected"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "last_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_eleven_selected"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "last_language_set",
"dest": "middle_language_set",
"conditions": "cic_ussd.state_machine.logic.menu.menu_twenty_two_selected"
},
{
"trigger": "scan_data",
"source": "last_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"source": "select_preferred_language",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "middle_language_set",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "last_language_set",
"dest": "exit_invalid_menu_option"
}
]

View File

@ -1,29 +1,4 @@
[
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "initial_pin_entry",
"after": "cic_ussd.state_machine.logic.account.change_preferred_language",
"conditions": "cic_ussd.state_machine.logic.menu.menu_one_selected"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "initial_pin_entry",
"after": "cic_ussd.state_machine.logic.account.change_preferred_language",
"conditions": "cic_ussd.state_machine.logic.menu.menu_two_selected"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "help",
"conditions": "cic_ussd.state_machine.logic.menu.menu_three_selected"
},
{
"trigger": "scan_data",
"source": "initial_language_selection",
"dest": "exit_invalid_menu_option"
},
{
"trigger": "scan_data",
"source": "initial_pin_entry",
@ -39,7 +14,6 @@
{
"trigger": "scan_data",
"source": "initial_pin_confirmation",
"unless": "cic_ussd.state_machine.logic.validator.has_cached_person_metadata",
"conditions": "cic_ussd.state_machine.logic.pin.pins_match",
"dest": "start",
"after": [

View File

@ -1,36 +0,0 @@
en:
female: |-
Female
from: |-
From
male: |-
Male
not_provided: |-
Not provided
no_transaction_history: |-
No transaction history
no_tokens_list: |-
No tokens to list
other: |-
Other
received: |-
Received
sent: |-
Sent
to: |-
To
guardians_list_header: |-
Walinzi uliowaongeza ni:
no_guardians_list: |-
No guardians set
error:
no_phone_number_provided: |-
No phone number was provided.
no_matching_account: |-
The number provided is not registered.
is_initiator: |-
Phone number cannot be your own.
is_existent_guardian: |-
This phone number is is already added as a guardian.
is_not_existent_guardian: |-
Phone number not set as PIN reset guardian.

View File

@ -1,36 +0,0 @@
sw:
female: |-
Mwanamke
from: |-
Kutoka kwa
male: |-
Mwanaume
not_provided: |-
Haijawekwa
no_transaction_history: |-
Hamna ripoti ya matumizi
no_tokens_list: |-
Hamna sarafu nyingine
other: |-
Nyingine
received: |-
Ulipokea
sent: |-
Ulituma
to: |-
Kwa
guardians_list_header: |-
Your set guardians are:
no_guardians_list: |-
Hamna walinzi walioongezwa
error:
no_phone_number_provided: |-
Namabari ya simu haijawekwa.
no_matching_account: |-
Nambari uliyoweka haijasajiliwa.
is_initiator: |-
Nambari yafaa kuwa tofauti na yako.
is_existent_guardian: |-
Namabari hii tayari imeongezwa kama mlinzi wa nambari ya siri.
is_not_existent_guardian: |-
Nambari hii haijaongezwa kama mlinzi wa nambari ya siri.

View File

@ -1,11 +0,0 @@
en:
account_successfully_created: |-
You have been registered on Sarafu Network! To use dial *384*96# on Safaricom and *483*96# on other networks. For help %{support_phone}.
received_tokens: |-
Successfully received %{amount} %{token_symbol} from %{tx_sender_information} %{timestamp} to %{tx_recipient_information}. New balance is %{balance} %{token_symbol}.
sent_tokens: |-
Successfully sent %{amount} %{token_symbol} to %{tx_recipient_information} %{timestamp} from %{tx_sender_information}. New balance is %{balance} %{token_symbol}.
terms: |-
By using the service, you agree to the terms and conditions at http://grassecon.org/tos
upsell_unregistered_recipient: |-
%{tx_sender_information} tried to send you %{token_symbol} but you are not registered. To use dial *384*96# on Safaricom and *483*96# on other networks. For help %{support_phone}.

View File

@ -1,11 +0,0 @@
sw:
account_successfully_created: |-
Umesajiliwa kwa huduma ya Sarafu! Kutumia bonyeza *384*96# Safaricom ama *483*46# kwa utandao tofauti. Kwa Usaidizi %{support_phone}.
received_tokens: |-
Umepokea %{amount} %{token_symbol} kutoka kwa %{tx_sender_information} %{timestamp} ikapokewa na %{tx_recipient_information}. Salio lako ni %{balance} %{token_symbol}.
sent_tokens: |-
Umetuma %{amount} %{token_symbol} kwa %{tx_recipient_information} %{timestamp} kutoka kwa %{tx_sender_information}. Salio lako ni %{balance} %{token_symbol}.
terms: |-
Kwa kutumia hii huduma, umekubali sheria na masharti yafuatayo http://grassecon.org/tos
upsell_unregistered_recipient: |-
%{tx_sender_information} amejaribu kukutumia %{token_symbol} lakini hujasajili. Kutumia bonyeza *384*96# Safaricom ama *483*46# kwa utandao tofauti. Kwa Usaidizi %{support_phone}.

Some files were not shown because too many files have changed in this diff Show More