Compare commits

...

12 Commits

Author SHA1 Message Date
nolash
e0a980363c Set value db fields in cic_cache to handle biiig numbers 2021-03-05 20:45:01 +01:00
9d36a5f92f Merge branch 'philip/dry-run-fixes' into 'master'
Get that "ussd_menu.json" out there.

See merge request grassrootseconomics/cic-internal-integration!50
2021-03-05 19:22:44 +00:00
2fe6f4125f Get that "ussd_menu.json" out there. 2021-03-05 22:09:22 +03:00
b5c50b348d Merge branch 'philip/dry-run-prep' into 'master'
Philip/dry run prep

See merge request grassrootseconomics/cic-internal-integration!49
2021-03-05 16:28:08 +00:00
ea336283dc Philip/dry run prep 2021-03-05 16:28:07 +00:00
aa99b16ad2 Merge branch 'lash/fix-tx-list' into 'master'
Make tx listing tasks work properly

See merge request grassrootseconomics/cic-internal-integration!45
2021-03-04 16:47:13 +00:00
Louis Holbrook
1e7fff0133 Minor refactors:
- Renames s_assemble to s_brief
-  Link s_local to s_brief
2021-03-04 16:47:13 +00:00
21c9d95c4b Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Introduce transfer authorization contract

See merge request grassrootseconomics/cic-internal-integration!47
2021-03-04 15:06:15 +00:00
Louis Holbrook
8240e58c0a cic-eth: Introduce transfer authorization contract 2021-03-04 15:06:14 +00:00
1e9bf6b4d3 Update .cic-template.yml 2021-03-04 14:39:32 +00:00
7e089e1083 Merge branch 'bvander/cic-meta-entrypoint-update' into 'master'
meta isn't using the compose command its in the the Dockerfile

See merge request grassrootseconomics/cic-internal-integration!42
2021-03-03 16:26:20 +00:00
f940d4a961 meta isn't using this. The entrypoin is in the Dockerfile 2021-02-24 06:47:34 -08:00
127 changed files with 2595 additions and 1171 deletions

View File

@@ -29,8 +29,8 @@ def upgrade():
sa.Column('source_token', sa.String(42), nullable=False),
sa.Column('destination_token', sa.String(42), nullable=False),
sa.Column('success', sa.Boolean, nullable=False),
sa.Column('from_value', sa.BIGINT(), nullable=False),
sa.Column('to_value', sa.BIGINT(), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=False),
sa.Column('date_block', sa.DateTime, nullable=False),
)
op.create_table(

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -1,12 +1,25 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@celery_app.task()
def out_tmp(tag, txt):
f = open('/tmp/err.{}.txt'.format(tag), "w")
f.write(txt)
f.close()
@celery_app.task(base=CriticalSQLAlchemyTask)
def alert(chained_input, tag, txt):
session = SessionBase.create_session()
o = Debug(tag, txt)
session.add(o)
session.commit()
session.close()
return chained_input

View File

@@ -489,8 +489,9 @@ class Api:
],
queue=self.queue,
)
s_local.link(s_brief)
if self.callback_param != None:
s_assemble.link(self.callback_success).on_error(self.callback_error)
s_brief.link(self.callback_success).on_error(self.callback_error)
t = None
if external_task != None:
@@ -515,11 +516,10 @@ class Api:
c = celery.chain(s_external_get, s_external_process)
t = celery.chord([s_local, c])(s_brief)
else:
t = s_local.apply_sync()
t = s_local.apply_async(queue=self.queue)
return t
def ping(self, r):
"""A noop callback ping for testing purposes.

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,23 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, String, DateTime
# local imports
from .base import SessionBase
class Debug(SessionBase):
__tablename__ = 'debug'
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tag = Column(String)
description = Column(String)
def __init__(self, tag, description):
self.tag = tag
self.description = description

View File

@@ -88,7 +88,7 @@ class Nonce(SessionBase):
#session.execute('UNLOCK TABLE nonce')
#conn.close()
session.commit()
session.commit()
# session.commit()
SessionBase.release_session(session)
return nonce

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# third-party imports
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method

View File

@@ -1,194 +0,0 @@
# standard imports
import logging
# third-party imports
import web3
import celery
from erc20_approval_escrow import TransferApproval
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.error import TokenCountError
celery_app = celery.current_app
logg = logging.getLogger()
contract_function_signatures = {
'request': 'b0addede',
}
class TransferRequestTxFactory(TxFactory):
"""Factory for creating Transfer request transactions using the TransferApproval contract backend
"""
def request(
self,
token_address,
beneficiary_address,
amount,
chain_spec,
):
"""Create a new TransferApproval.request transaction
:param token_address: Token to create transfer request for
:type token_address: str, 0x-hex
:param beneficiary_address: Beneficiary of token transfer
:type beneficiary_address: str, 0x-hex
:param amount: Amount of tokens to transfer
:type amount: number
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Transaction in standard Ethereum format
:rtype: dict
"""
transfer_approval = CICRegistry.get_contract(chain_spec, 'TransferApproval', 'TransferAuthorization')
fn = transfer_approval.function('createRequest')
tx_approval_buildable = fn(beneficiary_address, token_address, amount)
transfer_approval_gas = transfer_approval.gas('createRequest')
tx_approval = tx_approval_buildable.buildTransaction({
'from': self.address,
'gas': transfer_approval_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approval
def unpack_transfer_approval_request(data):
"""Verifies that a transaction is an "TransferApproval.request" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['request']:
raise ValueError('Invalid transfer request data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'token': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
@celery_app.task(bind=True)
def transfer_approval_request(self, tokens, holder_address, receiver_address, value, chain_str):
"""Creates a new transfer approval
:param tokens: Token to generate transfer request for
:type tokens: list with single token spec as dict
:param holder_address: Address to generate transfer on behalf of
:type holder_address: str, 0x-hex
:param receiver_address: Address to transfser tokens to
:type receiver_address: str, 0x-hex
:param value: Amount of tokens to transfer
:type value: number
:param chain_spec: Chain spec string representation
:type chain_spec: str
:raises cic_eth.error.TokenCountError: More than one token in tokens argument
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
t = tokens[0]
c = RpcClient(holder_address)
txf = TransferRequestTxFactory(holder_address, c)
tx_transfer = txf.request(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.request.otx_cache_transfer_approval_request')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task()
def otx_cache_transfer_approval_request(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an TransferApproval.request transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('in otx acche transfer approval request')
(txc, cache_id) = cache_transfer_approval_request_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_transfer_approval_request_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer_approval_request
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer_approval_request(tx['data'])
logg.debug('tx approval request data {}'.format(tx_data))
logg.debug('tx approval request {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx_data['token'],
tx_data['token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -69,7 +69,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('ooooo {}'.format(o))
if address == None:
address = o['address']
@@ -178,12 +177,7 @@ class ParityNodeHandler:
def handle(self, exception, tx_hash_hex, tx_hex):
meth = self.handle_default
if isinstance(exception, (ValueError)):
# s_debug = celery.signature(
# 'cic_eth.admin.debug.out_tmp',
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
# queue=queue,
# )
# s_debug.apply_async()
earg = exception.args[0]
if earg['code'] == -32010:
logg.debug('skipping lock for code {}'.format(earg['code']))
@@ -191,14 +185,15 @@ class ParityNodeHandler:
elif earg['code'] == -32602:
meth = self.handle_invalid_encoding
else:
# TODO: move to status log db comment field
meth = self.handle_invalid
elif isinstance(exception, (requests.exceptions.ConnectionError)):
meth = self.handle_connection
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
(t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception))
return (t, e_fn, '{} {}'.format(message, exception))
def handle_connection(self, tx_hash_hex, tx_hex):
def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None):
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -211,7 +206,7 @@ class ParityNodeHandler:
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -258,7 +253,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None):
s_sync = celery.signature(
'cic_eth.eth.tx.sync_tx',
[
@@ -271,7 +266,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid(self, tx_hash_hex, tx_hex):
def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -289,6 +284,16 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
tx_hash_hex,
debugstr,
],
queue=queue,
)
s_set_reject.link(s_debug)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))

View File

@@ -149,6 +149,9 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
txs_by_block = {}
chain_spec = ChainSpec.from_chain_str(chain_str)
if isinstance(tx_batches, dict):
tx_batches = [tx_batches]
for b in tx_batches:
for v in b.values():
tx = None

View File

@@ -30,6 +30,7 @@ from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError
from cic_eth.db.enum import status_str
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
@@ -405,7 +406,7 @@ def get_tx_cache(tx_hash):
'tx_hash': otx.tx_hash,
'signed_tx': otx.signed_tx,
'nonce': otx.nonce,
'status': StatusEnum(otx.status).name,
'status': status_str(otx.status),
'status_code': otx.status,
'source_token': txc.source_token_address,
'destination_token': txc.destination_token_address,

View File

@@ -2,3 +2,4 @@ from .callback import CallbackFilter
from .tx import TxFilter
from .gas import GasFilter
from .register import RegistrationFilter
from .transferauth import TransferAuthFilter

View File

@@ -1,7 +1,7 @@
# standard imports
import logging
# third-party imports
# external imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
@@ -24,7 +24,7 @@ class GasFilter(SyncFilter):
self.chain_spec = chain_spec
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
def filter(self, conn, block, tx, session):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))

View File

@@ -0,0 +1,89 @@
# standard imports
import logging
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum
from .base import SyncFilter
logg = logging.getLogger(__name__)
transfer_request_signature = 'ed71262a'
def unpack_create_request(data):
data = strip_0x(data)
cursor = 0
f = data[cursor:cursor+8]
cursor += 8
if f != transfer_request_signature:
raise ValueError('Invalid create request data ({})'.format(f))
o = {}
o['sender'] = data[cursor+24:cursor+64]
cursor += 64
o['recipient'] = data[cursor+24:cursor+64]
cursor += 64
o['token'] = data[cursor+24:cursor+64]
cursor += 64
o['value'] = int(data[cursor:], 16)
return o
class TransferAuthFilter(SyncFilter):
def __init__(self, registry, chain_spec, queue=None):
self.queue = queue
self.chain_spec = chain_spec
self.transfer_request_contract = registry.get_contract(self.chain_spec, 'TransferAuthorization')
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
if tx.payload == None:
logg.debug('no payload')
return False
payloadlength = len(tx.payload)
if payloadlength != 8+256:
logg.debug('{} below minimum length for a transfer auth call'.format(payloadlength))
logg.debug('payload {}'.format(tx.payload))
return False
recipient = tx.inputs[0]
if recipient != self.transfer_request_contract.address():
logg.debug('not our transfer auth contract address {}'.format(recipient))
return False
o = unpack_create_request(tx.payload)
sender = add_0x(to_checksum(o['sender']))
recipient = add_0x(to_checksum(recipient))
token = add_0x(to_checksum(o['token']))
s = celery.signature(
'cic_eth.eth.token.approve',
[
[
{
'address': token,
},
],
sender,
recipient,
o['value'],
str(self.chain_spec),
],
queue=self.queue,
)
t = s.apply_async()
return True
def __str__(self):
return 'cic-eth transfer auth filter'

View File

@@ -55,62 +55,25 @@ SessionBase.connect(dsn)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
re_transfer_approval_request = r'^/transferrequest/?'
re_something = r'^/something/?'
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
def process_transfer_approval_request(session, env):
r = re.match(re_transfer_approval_request, env.get('PATH_INFO'))
def process_something(session, env):
r = re.match(re_something, env.get('PATH_INFO'))
if not r:
return None
if env.get('CONTENT_TYPE') != 'application/json':
raise AttributeError('content type')
#if env.get('CONTENT_TYPE') != 'application/json':
# raise AttributeError('content type')
if env.get('REQUEST_METHOD') != 'POST':
raise AttributeError('method')
#if env.get('REQUEST_METHOD') != 'POST':
# raise AttributeError('method')
post_data = json.load(env.get('wsgi.input'))
token_address = web3.Web3.toChecksumAddress(post_data['token_address'])
holder_address = web3.Web3.toChecksumAddress(post_data['holder_address'])
beneficiary_address = web3.Web3.toChecksumAddress(post_data['beneficiary_address'])
value = int(post_data['value'])
logg.debug('transfer approval request token {} to {} from {} value {}'.format(
token_address,
beneficiary_address,
holder_address,
value,
)
)
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': token_address,
},
],
holder_address,
beneficiary_address,
value,
config.get('CIC_CHAIN_SPEC'),
],
queue=queue,
)
t = s.apply_async()
r = t.get()
tx_raw_bytes = bytes.fromhex(r[0][2:])
tx = unpack_signed_raw_tx(tx_raw_bytes, chain_spec.chain_id())
for r in t.collect():
logg.debug('result {}'.format(r))
if not t.successful():
raise RuntimeError(tx['hash'])
return ('text/plain', tx['hash'].encode('utf-8'),)
#post_data = json.load(env.get('wsgi.input'))
#return ('text/plain', 'foo'.encode('utf-8'),)
# uwsgi application
@@ -125,7 +88,7 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transfer_approval_request,
process_something,
]:
try:
r = handler(session, env)

View File

@@ -26,7 +26,6 @@ from cic_eth.eth import bancor
from cic_eth.eth import token
from cic_eth.eth import tx
from cic_eth.eth import account
from cic_eth.eth import request
from cic_eth.admin import debug
from cic_eth.admin import ctrl
from cic_eth.eth.rpc import RpcClient
@@ -40,6 +39,7 @@ from cic_eth.callbacks import redis
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db import dsn_from_config
from cic_eth.ext import tx
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()

View File

@@ -58,6 +58,7 @@ from cic_eth.runnable.daemons.filters import (
GasFilter,
TxFilter,
RegistrationFilter,
TransferAuthFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -146,6 +147,8 @@ def main():
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
@@ -153,6 +156,7 @@ def main():
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
syncer.add_filter(transfer_auth_filter)
for cf in callback_filters:
syncer.add_filter(cf)

View File

@@ -23,8 +23,11 @@ from hexathon import add_0x
# local imports
from cic_eth.api import AdminApi
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
StatusEnum,
status_str,
LockEnum,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -119,7 +122,7 @@ def render_tx(o, **kwargs):
for v in o.get('status_log', []):
d = datetime.datetime.fromisoformat(v[0])
e = StatusEnum(v[1]).name
e = status_str(v[1])
content += '{}: {}\n'.format(d, e)
return content

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.36',
'alpha.38',
)
version_object = semver.VersionInfo(

View File

@@ -1,6 +1,7 @@
cic-base~=0.1.1a10
web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc3
crypto-dev-signer~=0.4.13rc4
confini~=0.3.6rc3
cic-registry~=0.5.3a22
cic-bancor~=0.0.6
@@ -9,7 +10,7 @@ alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.10a10
erc20-approval-escrow~=0.3.0a5
erc20-transfer-authorization~=0.3.0a10
erc20-single-shot-faucet~=0.2.0a6
rlp==2.0.1
uWSGI==2.0.19.1
@@ -21,4 +22,3 @@ eth-address-index~=0.1.0a8
chainlib~=0.0.1a19
hexathon~=0.0.1a3
chainsyncer~=0.0.1a19
cic-base==0.1.1a10

View File

@@ -13,13 +13,13 @@ def celery_includes():
return [
'cic_eth.eth.bancor',
'cic_eth.eth.token',
'cic_eth.eth.request',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
'cic_eth.queue.tx',
'cic_eth.queue.balance',
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.admin.debug',
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',

View File

@@ -0,0 +1,29 @@
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
def test_debug_alert(
init_database,
celery_session_worker,
):
s = celery.signature(
'cic_eth.admin.debug.alert',
[
'foo',
'bar',
'baz',
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Debug)
q = q.filter(Debug.tag=='bar')
o = q.first()
assert o.description == 'baz'

View File

@@ -1,76 +0,0 @@
# standard imports
import logging
import time
# third-party imports
from erc20_approval_escrow import TransferApproval
import celery
import sha3
# local imports
from cic_eth.eth.token import TokenTxFactory
logg = logging.getLogger()
# BUG: transaction receipt only found sometimes
def test_transfer_approval(
default_chain_spec,
transfer_approval,
bancor_tokens,
w3_account_roles,
eth_empty_accounts,
cic_registry,
init_database,
celery_session_worker,
init_eth_tester,
init_w3,
):
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': bancor_tokens[0],
},
],
w3_account_roles['eth_account_sarafu_owner'],
eth_empty_accounts[0],
1024,
str(default_chain_spec),
],
)
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
tx_signed_raws = t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
init_eth_tester.mine_block()
h = sha3.keccak_256()
tx_signed_raw = tx_signed_raws[0]
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw[2:])
h.update(tx_signed_raw_bytes)
tx_hash = h.digest()
rcpt = init_w3.eth.getTransactionReceipt(tx_hash)
assert rcpt.status == 1
a = TransferApproval(init_w3, transfer_approval)
assert a.last_serial() == 1
logg.debug('requests {}'.format(a.requests(1)['serial']))

View File

@@ -0,0 +1,16 @@
# local imports
from cic_eth.db.models.debug import Debug
def test_debug(
init_database,
):
o = Debug('foo', 'bar')
init_database.add(o)
init_database.commit()
q = init_database.query(Debug)
q = q.filter(Debug.tag=='foo')
o = q.first()
assert o.description == 'bar'

View File

@@ -7,10 +7,8 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[ussd]
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
MENU_FILE=/usr/src/data/ussd_menu.json
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/

View File

@@ -1,2 +1,5 @@
[cic]
chain_spec = Bloxberg:8995
engine = evm
common_name = bloxberg
network_id = 8996
meta_url = http://localhost:63380

View File

@@ -1,7 +1,7 @@
[database]
NAME=cic_ussd
USER=postgres
PASSWORD=password
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql

View File

@@ -0,0 +1,5 @@
[pgp]
export_dir = /usr/src/pgp/keys/
keys_path = /usr/src/secrets/
private_keys = privatekeys_meta.asc
passphrase =

View File

@@ -7,8 +7,8 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[ussd]
MENU_FILE=cic_ussd/db/ussd_menu.json
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
[statemachine]
STATES=states/
TRANSITIONS=transitions/
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/

View File

@@ -1,2 +1,5 @@
[cic]
chain_spec = Bloxberg:8995
engine = evm
common_name = bloxberg
network_id = 8996
meta_url = http://localhost:63380

View File

@@ -0,0 +1,5 @@
[pgp]
export_dir = /usr/src/pgp/keys/
keys_path = /usr/src/secrets/
private_keys = privatekeys_meta.asc
passphrase =

View File

@@ -0,0 +1,49 @@
# standard imports
import json
# third-party imports
from cic_eth.api import Api
from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
def define_account_tx_metadata(user: User):
# get sender metadata
identifier = blockchain_address_to_metadata_pointer(
blockchain_address=user.blockchain_address
)
key = generate_metadata_pointer(
identifier=identifier,
cic_type='cic.person'
)
account_metadata = get_cached_data(key=key)
if account_metadata:
account_metadata = json.loads(account_metadata)
person = Person()
deserialized_person = person.deserialize(metadata=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
return f'{given_name} {family_name} {phone_number}'
else:
phone_number = user.phone_number
return phone_number
def retrieve_account_statement(blockchain_address: str):
chain_str = Chain.spec.__str__()
cic_eth_api = Api(
chain_str=chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
callback_param=blockchain_address
)
result = cic_eth_api.list(address=blockchain_address, limit=9)

View File

@@ -1,39 +0,0 @@
# standard imports
import logging
from collections import deque
# third-party imports
from cic_eth.api import Api
# local imports
from cic_ussd.transactions import from_wei
logg = logging.getLogger()
class BalanceManager:
def __init__(self, address: str, chain_str: str, token_symbol: str):
"""
:param address: Ethereum address of account whose balance is being queried
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param token_symbol: ERC20 token symbol of whose balance is being queried
:type token_symbol: str
"""
self.address = address
self.chain_str = chain_str
self.token_symbol = token_symbol
def get_operational_balance(self) -> float:
"""This question queries cic-eth for an account's balance
:return: The current balance of the account as reflected on the blockchain.
:rtype: int
"""
cic_eth_api = Api(chain_str=self.chain_str, callback_task=None)
balance_request_task = cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
balance_request_task_results = balance_request_task.collect()
balance_result = deque(balance_request_task_results, maxlen=1).pop()
balance = from_wei(value=balance_result[-1])
return balance

View File

@@ -0,0 +1,90 @@
# standard imports
import json
import logging
from typing import Union
# third-party imports
import celery
from cic_eth.api import Api
# local imports
from cic_ussd.error import CachedDataNotFoundError
from cic_ussd.redis import create_cached_data_key, get_cached_data
from cic_ussd.conversions import from_wei
logg = logging.getLogger()
class BalanceManager:
def __init__(self, address: str, chain_str: str, token_symbol: str):
"""
:param address: Ethereum address of account whose balance is being queried
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param token_symbol: ERC20 token symbol of whose balance is being queried
:type token_symbol: str
"""
self.address = address
self.chain_str = chain_str
self.token_symbol = token_symbol
def get_balances(self, asynchronous: bool = False) -> Union[celery.Task, dict]:
"""
This function queries cic-eth for an account's balances, It provides a means to receive the balance either
asynchronously or synchronously depending on the provided value for teh asynchronous parameter. It returns a
dictionary containing network, outgoing and incoming balances.
:param asynchronous: Boolean value checking whether to return balances asynchronously
:type asynchronous: bool
:return:
:rtype:
"""
if asynchronous:
cic_eth_api = Api(
chain_str=self.chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback',
callback_param=''
)
cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
else:
cic_eth_api = Api(chain_str=self.chain_str)
balance_request_task = cic_eth_api.balance(
address=self.address,
token_symbol=self.token_symbol)
return balance_request_task.get()[0]
def compute_operational_balance(balances: dict) -> float:
"""This function calculates the right balance given incoming and outgoing
:param balances:
:type balances:
:return:
:rtype:
"""
incoming_balance = balances.get('balance_incoming')
outgoing_balance = balances.get('balance_outgoing')
network_balance = balances.get('balance_network')
operational_balance = (network_balance + incoming_balance) - outgoing_balance
return from_wei(value=operational_balance)
def get_cached_operational_balance(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt='cic.balances_data'
)
cached_balance = get_cached_data(key=key)
if cached_balance:
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
return operational_balance
else:
raise CachedDataNotFoundError('Cached operational balance not found.')

View File

@@ -0,0 +1,10 @@
# local imports
# third-party imports
from chainlib.chain import ChainSpec
# local imports
class Chain:
spec: ChainSpec = None

View File

@@ -0,0 +1,41 @@
# standard imports
import decimal
# third-party imports
# local imports
def truncate(value: float, decimals: int):
"""This function truncates a value to a specified number of decimals places.
:param value: The value to be truncated.
:type value: float
:param decimals: The number of decimals for the value to be truncated to
:type decimals: int
:return: The truncated value.
:rtype: int
"""
decimal.getcontext().rounding = decimal.ROUND_DOWN
contextualized_value = decimal.Decimal(value)
return round(contextualized_value, decimals)
def from_wei(value: int) -> float:
"""This function converts values in Wei to a token in the cic network.
:param value: Value in Wei
:type value: int
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
return truncate(value=value, decimals=2)
def to_wei(value: int) -> int:
"""This functions converts values from a token in the cic network to Wei.
:param value: Value in SRF
:type value: int
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)

View File

@@ -1,213 +1,237 @@
{
"ussd_menu": {
"1": {
"description": "The self signup process has been initiated and the account is being created",
"display_key": "ussd.kenya.account_creation_prompt",
"name": "account_creation_prompt",
"parent": null
},
"2": {
"description": "Start menu. This is the entry point for users to select their preferred language",
"description": "Entry point for users to select their preferred language.",
"display_key": "ussd.kenya.initial_language_selection",
"name": "initial_language_selection",
"parent": null
},
"3": {
"description": "PIN setup entry menu",
"2": {
"description": "Entry point for users to enter a pin to secure their account.",
"display_key": "ussd.kenya.initial_pin_entry",
"name": "initial_pin_entry",
"parent": "initial_language_selection"
"parent": null
},
"4": {
"description": "Confirm new PIN menu",
"3": {
"description": "Pin confirmation entry menu.",
"display_key": "ussd.kenya.initial_pin_confirmation",
"name": "initial_pin_confirmation",
"parent": "initial_pin_entry"
},
"4": {
"description": "The signup process has been initiated and the account is being created.",
"display_key": "ussd.kenya.account_creation_prompt",
"name": "account_creation_prompt",
"parent": null
},
"5": {
"description": "Start menu. This is the entry point for activated users",
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"name": "start",
"parent": null
},
"6": {
"description": "Send Token recipient entry",
"description": "Given name entry menu.",
"display_key": "ussd.kenya.enter_given_name",
"name": "enter_given_name",
"parent": "metadata_management"
},
"7": {
"description": "Family name entry menu.",
"display_key": "ussd.kenya.enter_family_name",
"name": "enter_family_name",
"parent": "metadata_management"
},
"8": {
"description": "Gender entry menu.",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"9": {
"description": "Age entry menu.",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"10": {
"description": "Location entry menu.",
"display_key": "ussd.kenya.enter_location",
"name": "enter_location",
"parent": "metadata_management"
},
"11": {
"description": "Products entry menu.",
"display_key": "ussd.kenya.enter_products",
"name": "enter_products",
"parent": "metadata_management"
},
"12": {
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"name": "start",
"parent": null
},
"13": {
"description": "Send Token recipient entry.",
"display_key": "ussd.kenya.enter_transaction_recipient",
"name": "enter_transaction_recipient",
"parent": "start"
},
"7": {
"description": "Send Token amount prompt menu",
"14": {
"description": "Send Token amount prompt menu.",
"display_key": "ussd.kenya.enter_transaction_amount",
"name": "enter_transaction_amount",
"parent": "start"
},
"8": {
"description": "PIN entry for authorization to send token",
"15": {
"description": "Pin entry for authorization to send token.",
"display_key": "ussd.kenya.transaction_pin_authorization",
"name": "transaction_pin_authorization",
"parent": "start"
},
"9": {
"description": "Terminal of a menu flow where an SMS is expected after.",
"display_key": "ussd.kenya.complete",
"name": "complete",
"parent": null
},
"10": {
"description": "Help menu",
"display_key": "ussd.kenya.help",
"name": "help",
"16": {
"description": "Manage account menu.",
"display_key": "ussd.kenya.account_management",
"name": "account_management",
"parent": "start"
},
"11": {
"description": "Manage account menu",
"display_key": "ussd.kenya.profile_management",
"name": "profile_management",
"17": {
"description": "Manage metadata menu.",
"display_key": "ussd.kenya.metadata_management",
"name": "metadata_management",
"parent": "start"
},
"12": {
"description": "Manage business directory info",
"18": {
"description": "Manage user's preferred language menu.",
"display_key": "ussd.kenya.select_preferred_language",
"name": "select_preferred_language",
"parent": "account_management"
},
"13": {
"description": "About business directory info",
"19": {
"description": "Retrieve mini-statement menu.",
"display_key": "ussd.kenya.mini_statement_pin_authorization",
"name": "mini_statement_pin_authorization",
"parent": "account_management"
},
"14": {
"description": "Change business directory info",
"20": {
"description": "Manage user's pin menu.",
"display_key": "ussd.kenya.enter_current_pin",
"name": "enter_current_pin",
"parent": "account_management"
},
"15": {
"description": "New PIN entry menu",
"21": {
"description": "New pin entry menu.",
"display_key": "ussd.kenya.enter_new_pin",
"name": "enter_new_pin",
"parent": "account_management"
},
"16": {
"description": "First name entry menu",
"display_key": "ussd.kenya.enter_first_name",
"name": "enter_first_name",
"parent": "profile_management"
},
"17": {
"description": "Last name entry menu",
"display_key": "ussd.kenya.enter_last_name",
"name": "enter_last_name",
"parent": "profile_management"
},
"18": {
"description": "Gender entry menu",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "profile_management"
},
"19": {
"description": "Location entry menu",
"display_key": "ussd.kenya.enter_location",
"name": "enter_location",
"parent": "profile_management"
},
"20": {
"description": "Business profile entry menu",
"display_key": "ussd.kenya.enter_business_profile",
"name": "enter_business_profile",
"parent": "profile_management"
},
"21": {
"description": "Menu to display a user's entire profile",
"display_key": "ussd.kenya.display_user_profile_data",
"name": "display_user_profile_data",
"parent": "profile_management"
},
"22": {
"description": "Pin authorization to change name",
"display_key": "ussd.kenya.name_management_pin_authorization",
"name": "name_management_pin_authorization",
"parent": "profile_management"
"description": "Pin entry menu.",
"display_key": "ussd.kenya.standard_pin_authorization",
"name": "standard_pin_authorization",
"parent": "start"
},
"23": {
"description": "Pin authorization to change gender",
"display_key": "ussd.kenya.gender_management_pin_authorization",
"name": "gender_management_pin_authorization",
"parent": "profile_management"
},
"24": {
"description": "Pin authorization to change location",
"display_key": "ussd.kenya.location_management_pin_authorization",
"name": "location_management_pin_authorization",
"parent": "profile_management"
},
"26": {
"description": "Pin authorization to display user's profile",
"display_key": "ussd.kenya.view_profile_pin_authorization",
"name": "view_profile_pin_authorization",
"parent": "profile_management"
},
"27": {
"description": "Exit menu",
"description": "Exit menu.",
"display_key": "ussd.kenya.exit",
"name": "exit",
"parent": null
},
"28": {
"description": "Invalid menu option",
"24": {
"description": "Invalid menu option.",
"display_key": "ussd.kenya.exit_invalid_menu_option",
"name": "exit_invalid_menu_option",
"parent": null
},
"29": {
"description": "PIN policy violation",
"25": {
"description": "Pin policy violation.",
"display_key": "ussd.kenya.exit_invalid_pin",
"name": "exit_invalid_pin",
"parent": null
},
"30": {
"description": "PIN mismatch. New PIN and the new PIN confirmation do not match",
"26": {
"description": "Pin mismatch. New pin and the new pin confirmation do not match",
"display_key": "ussd.kenya.exit_pin_mismatch",
"name": "exit_pin_mismatch",
"parent": null
},
"31": {
"description": "Ussd PIN Blocked Menu",
"27": {
"description": "Ussd pin blocked Menu",
"display_key": "ussd.kenya.exit_pin_blocked",
"name": "exit_pin_blocked",
"parent": null
},
"32": {
"description": "Key params missing in request",
"28": {
"description": "Key params missing in request.",
"display_key": "ussd.kenya.exit_invalid_request",
"name": "exit_invalid_request",
"parent": null
},
"33": {
"description": "The user did not select a choice",
"29": {
"description": "The user did not select a choice.",
"display_key": "ussd.kenya.exit_invalid_input",
"name": "exit_invalid_input",
"parent": null
},
"34": {
"30": {
"description": "Exit following unsuccessful transaction due to insufficient account balance.",
"display_key": "ussd.kenya.exit_insufficient_balance",
"name": "exit_insufficient_balance",
"parent": null
},
"31": {
"description": "Exit following a successful transaction.",
"display_key": "ussd.kenya.exit_successful_transaction",
"name": "exit_successful_transaction",
"parent": null
},
"32": {
"description": "End of a menu flow.",
"display_key": "ussd.kenya.complete",
"name": "complete",
"parent": null
},
"33": {
"description": "Pin entry menu to view account balances.",
"display_key": "ussd.kenya.account_balances_pin_authorization",
"name": "account_balances_pin_authorization",
"parent": "account_management"
},
"34": {
"description": "Pin entry menu to view account statement.",
"display_key": "ussd.kenya.account_statement_pin_authorization",
"name": "account_statement_pin_authorization",
"parent": "account_management"
},
"35": {
"description": "Manage account menu",
"display_key": "ussd.kenya.account_management",
"name": "account_management",
"parent": "start"
"description": "Menu to display account balances.",
"display_key": "ussd.kenya.account_balances",
"name": "account_balances",
"parent": "account_management"
},
"36": {
"description": "Exit following insufficient balance to perform a transaction.",
"display_key": "ussd.kenya.exit_insufficient_balance",
"name": "exit_insufficient_balance",
"description": "Menu to display first set of transactions in statement.",
"display_key": "ussd.kenya.first_transaction_set",
"name": "first_transaction_set",
"parent": null
},
"37": {
"description": "Menu to display middle set of transactions in statement.",
"display_key": "ussd.kenya.middle_transaction_set",
"name": "middle_transaction_set",
"parent": null
},
"38": {
"description": "Menu to display last set of transactions in statement.",
"display_key": "ussd.kenya.last_transaction_set",
"name": "last_transaction_set",
"parent": null
},
"39": {
"description": "Menu to instruct users to call the office.",
"display_key": "ussd.key.help",
"name": "help",
"parent": null
}
}

View File

@@ -17,3 +17,17 @@ class ActionDataNotFoundError(OSError):
"""Raised when action data matching a specific task uuid is not found in the redis cache"""
pass
class UserMetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
class UnsupportedMethodError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass

View File

@@ -0,0 +1,43 @@
# standard imports
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import add_0x
# local imports
from cic_ussd.error import UnsupportedMethodError
def make_request(method: str, url: str, data: any = None, headers: dict = None):
"""
:param method:
:type method:
:param url:
:type url:
:param data:
:type data:
:param headers:
:type headers:
:return:
:rtype:
"""
if method == 'GET':
result = requests.get(url=url)
elif method == 'POST':
result = requests.post(url=url, data=data, headers=headers)
elif method == 'PUT':
result = requests.put(url=url, data=data, headers=headers)
else:
raise UnsupportedMethodError(f'Unsupported method: {method}')
return result
def blockchain_address_to_metadata_pointer(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
return bytes.fromhex(blockchain_address[2:])

View File

@@ -0,0 +1,63 @@
# standard imports
import json
import logging
from typing import Optional
from urllib.request import Request, urlopen
# third-party imports
import gnupg
# local imports
logg = logging.getLogger()
class Signer:
"""
:cvar gpg_path:
:type gpg_path:
:cvar gpg_passphrase:
:type gpg_passphrase:
:cvar key_file_path:
:type key_file_path:
"""
gpg_path: str = None
gpg_passphrase: str = None
key_file_path: str = None
def __init__(self):
self.gpg = gnupg.GPG(gnupghome=self.gpg_path)
# parse key file data
key_file = open(self.key_file_path, 'r')
self.key_data = key_file.read()
key_file.close()
def get_operational_key(self):
"""
:return:
:rtype:
"""
# import key data into keyring
self.gpg.import_keys(key_data=self.key_data)
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):
"""
:param data:
:type data:
:return:
:rtype:
"""
data = json.loads(data)
digest = data['digest']
key_id = self.get_operational_key().get('keyid')
signature = self.gpg.sign(digest, passphrase=self.gpg_passphrase, keyid=key_id)
return str(signature)

View File

@@ -0,0 +1,102 @@
# standard imports
import json
import logging
import os
# third-party imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
logg = logging.getLogger()
class UserMetadata:
"""
:cvar base_url:
:type base_url:
"""
base_url = None
def __init__(self, identifier: bytes):
"""
:param identifier:
:type identifier:
"""
self. headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.person'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: dict):
try:
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine='pgp')
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def edit(self, data: bytes, engine: str):
"""
:param data:
:type data:
:param engine:
:type engine:
:return:
:rtype:
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def query(self):
result = make_request(method='GET', url=self.url)
status = result.status_code
logg.info(f'Get latest data status: {status}')
try:
if status == 200:
response_data = result.content
data = json.loads(response_data.decode())
# validate data
person = Person()
deserialized_person = person.deserialize(metadata=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)

View File

@@ -5,7 +5,6 @@ import logging
# third party imports
import celery
import i18n
import phonenumbers
from cic_eth.api.api_task import Api
from tinydb.table import Document
from typing import Optional
@@ -239,7 +238,7 @@ def persist_session_to_db_task(external_session_id: str, queue: str):
:type queue: str
"""
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd.persist_session_to_db',
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
s_persist_session_to_db.apply_async(queue=queue)
@@ -453,37 +452,3 @@ def save_to_in_memory_ussd_session_data(queue: str, session_data: dict, ussd_ses
)
persist_session_to_db_task(external_session_id=external_session_id, queue=queue)
def process_phone_number(phone_number: str, region: str):
"""This function parses any phone number for the provided region
:param phone_number: A string with a phone number.
:type phone_number: str
:param region: Caller defined region
:type region: str
:return: The parsed phone number value based on the defined region
:rtype: str
"""
if not isinstance(phone_number, str):
try:
phone_number = str(int(phone_number))
except ValueError:
pass
phone_number_object = phonenumbers.parse(phone_number, region)
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
return parsed_phone_number
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
"""This function queries the database for a user based on the provided phone number.
:param phone_number: A valid phone number.
:type phone_number: str
:return: A user object matching a given phone number
:rtype: User|None
"""
# consider adding region to user's metadata
phone_number = process_phone_number(phone_number=phone_number, region='KE')
user = User.session.query(User).filter_by(phone_number=phone_number).first()
return user

View File

@@ -0,0 +1,43 @@
# standard imports
from typing import Optional
# third-party imports
import phonenumbers
# local imports
from cic_ussd.db.models.user import User
def process_phone_number(phone_number: str, region: str):
"""This function parses any phone number for the provided region
:param phone_number: A string with a phone number.
:type phone_number: str
:param region: Caller defined region
:type region: str
:return: The parsed phone number value based on the defined region
:rtype: str
"""
if not isinstance(phone_number, str):
try:
phone_number = str(int(phone_number))
except ValueError:
pass
phone_number_object = phonenumbers.parse(phone_number, region)
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
return parsed_phone_number
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
"""This function queries the database for a user based on the provided phone number.
:param phone_number: A valid phone number.
:type phone_number: str
:return: A user object matching a given phone number
:rtype: User|None
"""
# consider adding region to user's metadata
phone_number = process_phone_number(phone_number=phone_number, region='KE')
user = User.session.query(User).filter_by(phone_number=phone_number).first()
return user

View File

@@ -1,17 +1,26 @@
# standard imports
import logging
import json
import re
from typing import Optional
# third party imports
import celery
from cic_types.models.person import Person
from tinydb.table import Document
# local imports
from cic_ussd.accounts import BalanceManager
from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement
from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.transactions import to_wei, from_wei
from cic_ussd.conversions import to_wei, from_wei
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
@@ -57,17 +66,17 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
:rtype: str
"""
# get account balance
balance_manager = BalanceManager(address=user.blockchain_address,
chain_str=UssdStateMachine.chain_str,
token_symbol='SRF')
balance = balance_manager.get_operational_balance()
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
# compile response data
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
token_symbol = 'SRF'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
tx_recipient_information = recipient_phone_number
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
return translation_for(
key=display_key,
@@ -75,7 +84,7 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=balance
token_balance=operational_balance
)
@@ -93,9 +102,9 @@ def process_exit_successful_transaction(display_key: str, user: User, ussd_sessi
transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount')))
token_symbol = 'SRF'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
sender_phone_number = user.phone_number
tx_recipient_information = recipient_phone_number
tx_sender_information = sender_phone_number
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=user)
return translation_for(
key=display_key,
@@ -122,9 +131,10 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
"""
# compile response data
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
tx_recipient_information = recipient_phone_number
tx_sender_information = user.phone_number
logg.debug('Requires integration with cic-meta to get user name.')
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=user)
token_symbol = 'SRF'
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
@@ -139,6 +149,122 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
)
def process_account_balances(user: User, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
# retrieve cached balance
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
logg.debug('Requires call to retrieve tax and bonus amounts')
tax = ''
bonus = ''
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
operational_balance=operational_balance,
tax=tax,
bonus=bonus,
token_symbol='SRF'
)
def format_transactions(transactions: list, preferred_language: str):
formatted_transactions = ''
if len(transactions) > 0:
for transaction in transactions:
recipient_phone_number = transaction.get('recipient_phone_number')
sender_phone_number = transaction.get('sender_phone_number')
value = transaction.get('to_value')
timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag')
token_symbol = 'SRF'
if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {recipient_phone_number} {timestamp}.\n'
else:
formatted_transactions += f'{action_tag} {value} {token_symbol} {sender_phone_number} {timestamp}. \n'
return formatted_transactions
else:
if preferred_language == 'en':
formatted_transactions = 'Empty'
else:
formatted_transactions = 'Hamna historia'
return formatted_transactions
def process_account_statement(user: User, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
# retrieve cached statement
identifier = blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address)
key = create_cached_data_key(identifier=identifier, salt='cic.statement')
transactions = get_cached_data(key=key)
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
transactions = json.loads(transactions)
if len(transactions) > 6:
last_transaction_set += transactions[6:]
middle_transaction_set += transactions[3:][:3]
first_transaction_set += transactions[:3]
# there are probably much cleaner and operational inexpensive ways to do this so find them
elif 4 < len(transactions) < 7:
middle_transaction_set += transactions[3:]
first_transaction_set += transactions[:3]
else:
first_transaction_set += transactions[:3]
if display_key == 'ussd.kenya.first_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
first_transaction_set=format_transactions(
transactions=first_transaction_set,
preferred_language=user.preferred_language
)
)
elif display_key == 'ussd.kenya.middle_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
middle_transaction_set=format_transactions(
transactions=middle_transaction_set,
preferred_language=user.preferred_language
)
)
elif display_key == 'ussd.kenya.last_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
last_transaction_set=format_transactions(
transactions=last_transaction_set,
preferred_language=user.preferred_language
)
)
def process_start_menu(display_key: str, user: User):
"""This function gets data on an account's balance and token in order to append it to the start of the start menu's
title. It passes said arguments to the translation function and returns the appropriate corresponding text from the
@@ -150,16 +276,41 @@ def process_start_menu(display_key: str, user: User):
:return: Corresponding translation text response
:rtype: str
"""
balance_manager = BalanceManager(address=user.blockchain_address,
chain_str=UssdStateMachine.chain_str,
chain_str = Chain.spec.__str__()
blockchain_address = user.blockchain_address
balance_manager = BalanceManager(address=blockchain_address,
chain_str=chain_str,
token_symbol='SRF')
balance = balance_manager.get_operational_balance()
# get balances synchronously for display on start menu
balances_data = balance_manager.get_balances()
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt='cic.balances_data'
)
cache_data(key=key, data=json.dumps(balances_data))
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
# TODO [Philip]: figure out how to get token symbol from a metadata layer of sorts.
token_symbol = 'SRF'
logg.debug("Requires integration to determine user's balance and token.")
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
account_balance=balance,
account_balance=operational_balance,
account_token_name=token_symbol
)
@@ -241,5 +392,11 @@ def custom_display_text(
return process_exit_successful_transaction(display_key=display_key, user=user, ussd_session=ussd_session)
elif menu_name == 'start':
return process_start_menu(display_key=display_key, user=user)
elif 'pin_authorization' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif menu_name == 'account_balances':
return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session)
elif 'transaction_set' in menu_name:
return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session)
else:
return translation_for(key=display_key, preferred_language=user.preferred_language)

View File

@@ -1,6 +1,52 @@
# standard imports
import hashlib
import logging
# third-party imports
from redis import Redis
logg = logging.getLogger()
class InMemoryStore:
cache: Redis = None
def cache_data(key: str, data: str):
"""
:param key:
:type key:
:param data:
:type data:
:return:
:rtype:
"""
cache = InMemoryStore.cache
cache.set(name=key, value=data)
cache.persist(name=key)
def get_cached_data(key: str):
"""
:param key:
:type key:
:return:
:rtype:
"""
cache = InMemoryStore.cache
return cache.get(name=key)
def create_cached_data_key(identifier: bytes, salt: str):
"""
:param identifier:
:type identifier:
:param salt:
:type salt:
:return:
:rtype:
"""
hash_object = hashlib.new("sha256")
hash_object.update(identifier)
hash_object.update(salt.encode(encoding="utf-8"))
return hash_object.digest().hex()

View File

@@ -12,14 +12,18 @@ import redis
# third-party imports
from confini import Config
from chainlib.chain import ChainSpec
from urllib.parse import quote_plus
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
@@ -59,6 +63,7 @@ config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
@@ -92,6 +97,14 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# initialize celery app
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@@ -99,7 +112,13 @@ celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY
states = json_file_parser(filepath=config.get('STATEMACHINE_STATES'))
transitions = json_file_parser(filepath=config.get('STATEMACHINE_TRANSITIONS'))
UssdStateMachine.chain_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec(
common_name=config.get('CIC_COMMON_NAME'),
engine=config.get('CIC_ENGINE'),
network_id=config.get('CIC_NETWORK_ID')
)
Chain.spec = chain_spec
UssdStateMachine.states = states
UssdStateMachine.transitions = transitions
@@ -152,7 +171,8 @@ def application(env, start_response):
return []
# handle menu interaction requests
response = process_menu_interaction_requests(chain_str=config.get('CIC_CHAIN_SPEC'),
chain_str = chain_spec.__str__()
response = process_menu_interaction_requests(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
queue=args.q,

View File

@@ -12,6 +12,8 @@ from confini import Config
# local imports
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
@@ -59,6 +61,14 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# set up celery
current_app = celery.Celery(__name__)

View File

@@ -1,14 +1,18 @@
# standard imports
import json
import logging
from typing import Tuple
# third party imports
import celery
# local imports
from cic_ussd.accounts import BalanceManager
from cic_ussd.balance import BalanceManager, compute_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.operations import get_user_by_phone_number, save_to_in_memory_ussd_session_data
from cic_ussd.state_machine.state_machine import UssdStateMachine
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.phone_number import get_user_by_phone_number
from cic_ussd.redis import create_cached_data_key, get_cached_data
from cic_ussd.transactions import OutgoingTransactionProcessor
@@ -27,22 +31,7 @@ def is_valid_recipient(state_machine_data: Tuple[str, dict, User]) -> bool:
recipient = get_user_by_phone_number(phone_number=user_input)
is_not_initiator = user_input != user.phone_number
has_active_account_status = user.get_account_status() == AccountStatus.ACTIVE.name
logg.debug('This section requires implementation of checks for user roles and authorization status of an account.')
return is_not_initiator and has_active_account_status
def is_valid_token_agent(state_machine_data: Tuple[str, dict, User]) -> bool:
"""This function checks that a user exists, is not the initiator of the transaction, has an active account status
and is authorized to perform exchange transactions.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
:return: A user's validity
:rtype: bool
"""
user_input, ussd_session, user = state_machine_data
# is_token_agent = AccountRole.TOKEN_AGENT.value in user.get_user_roles()
logg.debug('This section requires implementation of user roles and authorization to facilitate exchanges.')
return is_valid_recipient(state_machine_data=state_machine_data)
return is_not_initiator and has_active_account_status and recipient is not None
def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, User]) -> bool:
@@ -70,10 +59,17 @@ def has_sufficient_balance(state_machine_data: Tuple[str, dict, User]) -> bool:
"""
user_input, ussd_session, user = state_machine_data
balance_manager = BalanceManager(address=user.blockchain_address,
chain_str=UssdStateMachine.chain_str,
chain_str=Chain.spec.__str__(),
token_symbol='SRF')
balance = balance_manager.get_operational_balance()
return int(user_input) <= balance
# get cached balance
key = create_cached_data_key(
identifier=bytes.fromhex(user.blockchain_address[2:]),
salt='cic.balances_data'
)
cached_balance = get_cached_data(key=key)
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
return int(user_input) <= operational_balance
def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, User]):
@@ -88,6 +84,25 @@ def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Us
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, user = state_machine_data
recipient = get_user_by_phone_number(phone_number=user_input)
blockchain_address = recipient.blockchain_address
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
"""This function saves the phone number corresponding the intended recipients blockchain account.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -113,7 +128,8 @@ def process_transaction_request(state_machine_data: Tuple[str, dict, User]):
to_address = recipient.blockchain_address
from_address = user.blockchain_address
amount = int(ussd_session.get('session_data').get('transaction_amount'))
outgoing_tx_processor = OutgoingTransactionProcessor(chain_str=UssdStateMachine.chain_str,
chain_str = Chain.spec.__str__()
outgoing_tx_processor = OutgoingTransactionProcessor(chain_str=chain_str,
from_address=from_address,
to_address=to_address)
outgoing_tx_processor.process_outgoing_transfer_transaction(amount=amount)

View File

@@ -1,10 +1,20 @@
# standard imports
import json
import logging
from typing import Tuple
# third-party imports
import celery
from cic_types.models.person import Person, generate_metadata_pointer
from cic_types.models.person import generate_vcard_from_contact_data, manage_identity_data
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.redis import get_cached_data
logg = logging.getLogger(__file__)
@@ -42,7 +52,29 @@ def update_account_status_to_active(state_machine_data: Tuple[str, dict, User]):
User.session.commit()
def save_profile_attribute_to_session_data(state_machine_data: Tuple[str, dict, User]):
def process_gender_user_input(user: User, user_input: str):
"""
:param user:
:type user:
:param user_input:
:type user_input:
:return:
:rtype:
"""
if user.preferred_language == 'en':
if user_input == '1':
gender = 'Male'
else:
gender = 'Female'
else:
if user_input == '1':
gender = 'Mwanaume'
else:
gender = 'Mwanamke'
return gender
def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, User]):
"""This function saves first name data to the ussd session in the redis cache.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -54,16 +86,17 @@ def save_profile_attribute_to_session_data(state_machine_data: Tuple[str, dict,
# define session data key from current state
key = ''
if 'first_name' in current_state:
key = 'first_name'
elif 'last_name' in current_state:
key = 'last_name'
if 'given_name' in current_state:
key = 'given_name'
elif 'family_name' in current_state:
key = 'family_name'
elif 'gender' in current_state:
key = 'gender'
user_input = process_gender_user_input(user=user, user_input=user_input)
elif 'location' in current_state:
key = 'location'
elif 'business_profile' in current_state:
key = 'business_profile'
elif 'products' in current_state:
key = 'products'
# check if there is existing session data
if ussd_session.get('session_data'):
@@ -76,14 +109,120 @@ def save_profile_attribute_to_session_data(state_machine_data: Tuple[str, dict,
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
def persist_profile_data(state_machine_data: Tuple[str, dict, User]):
"""This function persists elements of the user profile stored in session data
def format_user_metadata(metadata: dict, user: User):
"""
:param metadata:
:type metadata:
:param user:
:type user:
:return:
:rtype:
"""
gender = metadata.get('gender')
given_name = metadata.get('given_name')
family_name = metadata.get('family_name')
location = {
"area_name": metadata.get('location')
}
products = []
if metadata.get('products'):
products = metadata.get('products').split(',')
phone_number = user.phone_number
date_registered = int(user.created.replace().timestamp())
blockchain_address = user.blockchain_address
chain_spec = f'{Chain.spec.common_name()}:{Chain.spec.network_id()}'
identities = manage_identity_data(
blockchain_address=blockchain_address,
blockchain_type=Chain.spec.engine(),
chain_spec=chain_spec
)
return {
"date_registered": date_registered,
"gender": gender,
"identities": identities,
"location": location,
"products": products,
"vcard": generate_vcard_from_contact_data(
family_name=family_name,
given_name=given_name,
tel=phone_number
)
}
def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
"""This function persists elements of the user metadata stored in session data
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
# get session data
profile_data = ussd_session.get('session_data')
logg.debug('This section requires implementation of user metadata.')
metadata = ussd_session.get('session_data')
# format metadata appropriately
user_metadata = format_user_metadata(metadata=metadata, user=user)
blockchain_address = user.blockchain_address
s_create_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_user_metadata',
[blockchain_address, user_metadata]
)
s_create_user_metadata.apply_async(queue='cic-ussd')
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
user_metadata = get_cached_data(key=key)
if not user_metadata:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
given_name = ussd_session.get('session_data').get('given_name')
family_name = ussd_session.get('session_data').get('family_name')
gender = ussd_session.get('session_data').get('gender')
location = ussd_session.get('session_data').get('location')
products = ussd_session.get('session_data').get('products')
# validate user metadata
person = Person()
user_metadata = json.loads(user_metadata)
deserialized_person = person.deserialize(metadata=user_metadata)
# edit specific metadata attribute
if given_name:
deserialized_person.given_name = given_name
elif family_name:
deserialized_person.family_name = family_name
elif gender:
deserialized_person.gender = gender
elif location:
# get existing location metadata:
location_data = user_metadata.get('location')
location_data['area_name'] = location
deserialized_person.location = location_data
elif products:
deserialized_person.products = products
edited_metadata = deserialized_person.serialize()
s_edit_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_user_metadata',
[blockchain_address, edited_metadata, 'pgp']
)
s_edit_user_metadata.apply_async(queue='cic-ussd')
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
s_get_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_get_user_metadata.apply_async(queue='cic-ussd')

View File

@@ -3,55 +3,30 @@ import logging
import re
from typing import Tuple
# third-party imports
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
logg = logging.getLogger()
def has_complete_profile_data(state_machine_data: Tuple[str, dict, User]):
def has_cached_user_metadata(state_machine_data: Tuple[str, dict, User]):
"""This function checks whether the attributes of the user's metadata constituting a profile are filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
logg.debug('This section requires implementation of user metadata.')
def has_empty_username_data(state_machine_data: Tuple[str, dict, User]):
"""This function checks whether the aspects of the user's name metadata is filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
logg.debug('This section requires implementation of user metadata.')
def has_empty_gender_data(state_machine_data: Tuple[str, dict, User]):
"""This function checks whether the aspects of the user's gender metadata is filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
logg.debug('This section requires implementation of user metadata.')
def has_empty_location_data(state_machine_data: Tuple[str, dict, User]):
"""This function checks whether the aspects of the user's location metadata is filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
logg.debug('This section requires implementation of user metadata.')
def has_empty_business_profile_data(state_machine_data: Tuple[str, dict, User]):
"""This function checks whether the aspects of the user's business profile metadata is filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
logg.debug('This section requires implementation of user metadata.')
# check for user metadata in cache
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
user_metadata = get_cached_data(key=key)
return user_metadata is not None
def is_valid_name(state_machine_data: Tuple[str, dict, User]):
@@ -66,3 +41,18 @@ def is_valid_name(state_machine_data: Tuple[str, dict, User]):
return True
else:
return False
def is_valid_gender_selection(state_machine_data: Tuple[str, dict, User]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, user = state_machine_data
selection_matcher = "^[1-2]$"
if re.match(selection_matcher, user_input):
return True
else:
return False

View File

@@ -14,15 +14,11 @@ class UssdStateMachine(Machine):
menu as well as providing a means for navigating through these states based on different user inputs.
It defines different helper functions that co-ordinate with the stakeholder components of the ussd menu: i.e the
User, UssdSession, UssdMenu to facilitate user interaction with ussd menu.
:cvar chain_str: The chain name and network id.
:type chain_str: str
:cvar states: A list of pre-defined states.
:type states: list
:cvar transitions: A list of pre-defined transitions.
:type transitions: list
"""
chain_str = None
states = []
transitions = []

View File

@@ -10,6 +10,7 @@ import celery
celery_app = celery.current_app
# export external celery task modules
from .foo import log_it_plz
from .ussd import persist_session_to_db
from .callback_handler import process_account_creation_callback
from .logger import *
from .ussd_session import *
from .callback_handler import *
from .metadata import *

View File

@@ -0,0 +1,20 @@
# standard imports
# third-party imports
import celery
import sqlalchemy
# local imports
class CriticalTask(celery.Task):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)

View File

@@ -1,23 +1,26 @@
# standard imports
import json
import logging
from datetime import timedelta
from datetime import datetime, timedelta
# third-party imports
import celery
# local imports
from cic_ussd.conversions import from_wei
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.user import User
from cic_ussd.account import define_account_tx_metadata
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.redis import InMemoryStore
from cic_ussd.redis import InMemoryStore, cache_data, create_cached_data_key
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
from cic_ussd.transactions import IncomingTransactionProcessor
logg = logging.getLogger(__file__)
celery_app = celery.current_app
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def process_account_creation_callback(self, result: str, url: str, status_code: int):
"""This function defines a task that creates a user and
:param result: The blockchain address for the created account
@@ -49,14 +52,14 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
user = User(blockchain_address=result, phone_number=phone_number)
session.add(user)
session.commit()
session.close()
# expire cache
cache.expire(task_id, timedelta(seconds=30))
session.close()
cache.expire(task_id, timedelta(seconds=180))
else:
cache.expire(task_id, timedelta(seconds=30))
session.close()
cache.expire(task_id, timedelta(seconds=180))
else:
session.close()
@@ -65,9 +68,8 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
logg.debug(f'PARAM: {param}, RESULT: {result}, STATUS_CODE: {status_code}')
session = SessionBase.create_session()
if result and status_code == 0:
if status_code == 0:
# collect result data
recipient_blockchain_address = result.get('recipient')
@@ -93,22 +95,123 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
value=value)
if param == 'tokengift':
logg.debug('Name information would require integration with cic meta.')
incoming_tx_processor.process_token_gift_incoming_transactions(first_name="")
incoming_tx_processor.process_token_gift_incoming_transactions()
elif param == 'transfer':
logg.debug('Name information would require integration with cic meta.')
if sender_user:
sender_information = f'{sender_user.phone_number}, {""}, {""}'
incoming_tx_processor.process_transfer_incoming_transaction(sender_information=sender_information)
sender_information = define_account_tx_metadata(user=sender_user)
incoming_tx_processor.process_transfer_incoming_transaction(
sender_information=sender_information,
recipient_blockchain_address=recipient_blockchain_address
)
else:
logg.warning(
f'Tx with sender: {sender_blockchain_address} was received but has no matching user in the system.'
)
incoming_tx_processor.process_transfer_incoming_transaction(
sender_information=sender_blockchain_address)
sender_information='GRASSROOTS ECONOMICS',
recipient_blockchain_address=recipient_blockchain_address
)
else:
session.close()
raise ValueError(f'Unexpected transaction param: {param}.')
else:
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
if status_code == 0:
balances_data = result[0]
blockchain_address = balances_data.get('address')
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt='cic.balances_data'
)
cache_data(key=key, data=json.dumps(balances_data))
else:
raise ValueError(f'Unexpected status code: {status_code}.')
# TODO: clean up this handler
def define_transaction_action_tag(
preferred_language: str,
sender_blockchain_address: str,
param: str):
# check if out going ot incoming transaction
if sender_blockchain_address == param:
# check preferred language
if preferred_language == 'en':
action_tag = 'SENT'
else:
action_tag = 'ULITUMA'
else:
if preferred_language == 'en':
action_tag = 'RECEIVED'
else:
action_tag = 'ULIPOKEA'
return action_tag
@celery_app.task
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
session = SessionBase.create_session()
processed_transactions = []
# process transaction data to cache
for transaction in result:
sender_blockchain_address = transaction.get('sender')
recipient_address = transaction.get('recipient')
source_token = transaction.get('source_token')
# filter out any transactions that are "gassy"
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
# describe a processed transaction
processed_transaction = {}
# check if sender is in the system
sender: User = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
if sender:
processed_transaction['sender_phone_number'] = sender.phone_number
action_tag = define_transaction_action_tag(
preferred_language=sender.preferred_language,
sender_blockchain_address=sender_blockchain_address,
param=param
)
processed_transaction['action_tag'] = action_tag
else:
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
# check if recipient is in the system
recipient: User = session.query(User).filter_by(blockchain_address=recipient_address).first()
if recipient:
processed_transaction['recipient_phone_number'] = recipient.phone_number
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value'))
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value'))
raw_timestamp = transaction.get('timestamp')
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')
processed_transaction['timestamp'] = timestamp
processed_transactions.append(processed_transaction)
# cache account statement
identifier = bytes.fromhex(param[2:])
key = create_cached_data_key(identifier=identifier, salt='cic.statement')
data = json.dumps(processed_transactions)
# cache statement data
cache_data(key=key, data=data)
else:
raise ValueError(f'Unexpected status code: {status_code}.')

View File

@@ -0,0 +1,48 @@
# standard imports
import json
import logging
# third-party imports
import celery
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.user import UserMetadata
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task
def query_user_metadata(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.query()
@celery_app.task
def create_user_metadata(blockchain_address: str, data: dict):
"""
:param blockchain_address:
:type blockchain_address:
:param data:
:type data:
:return:
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.create(data=data)
@celery_app.task
def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.edit(data=data, engine=engine)

View File

@@ -11,12 +11,13 @@ from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import SessionNotFoundError
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
celery_app = celery.current_app
logg = get_logger(__file__)
@celery_app.task
@celery_app.task(base=CriticalSQLAlchemyTask)
def persist_session_to_db(external_session_id: str):
"""
This task initiates the saving of the session object to the database and it's removal from the in-memory storage.
@@ -62,11 +63,10 @@ def persist_session_to_db(external_session_id: str):
in_db_ussd_session.set_data(key=key, value=value, session=session)
session.add(in_db_ussd_session)
session.commit()
session.close()
InMemoryUssdSession.redis_cache.expire(external_session_id, timedelta(minutes=1))
else:
session.close()
raise SessionNotFoundError('Session does not exist!')
session.commit()
session.close()

View File

@@ -7,6 +7,7 @@ from datetime import datetime
from cic_eth.api import Api
# local imports
from cic_ussd.balance import get_cached_operational_balance
from cic_ussd.notifications import Notifier
@@ -35,7 +36,7 @@ def from_wei(value: int) -> float:
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+18
value = float(value) / 1e+6
return truncate(value=value, decimals=2)
@@ -67,11 +68,9 @@ class IncomingTransactionProcessor:
self.token_symbol = token_symbol
self.value = value
def process_token_gift_incoming_transactions(self, first_name: str):
def process_token_gift_incoming_transactions(self):
"""This function processes incoming transactions with a "tokengift" param, it collects all appropriate data to
send out notifications to users when their accounts are successfully created.
:param first_name: The first name of the recipient of the token gift transaction.
:type first_name: str
"""
balance = from_wei(value=self.value)
@@ -80,20 +79,22 @@ class IncomingTransactionProcessor:
phone_number=self.phone_number,
preferred_language=self.preferred_language,
balance=balance,
first_name=first_name,
token_symbol=self.token_symbol)
def process_transfer_incoming_transaction(self, sender_information: str):
def process_transfer_incoming_transaction(self, sender_information: str, recipient_blockchain_address: str):
"""This function processes incoming transactions with the "transfer" param and issues notifications to users
about reception of funds into their accounts.
:param sender_information: A string with a user's full name and phone number.
:type sender_information: str
:param recipient_blockchain_address:
type recipient_blockchain_address: str
"""
key = 'sms.received_tokens'
amount = from_wei(value=self.value)
timestamp = datetime.now().strftime('%d-%m-%y, %H:%M %p')
logg.debug('Balance requires implementation of cic-eth integration with balance.')
operational_balance = get_cached_operational_balance(blockchain_address=recipient_blockchain_address)
notifier.send_sms_notification(key=key,
phone_number=self.phone_number,
preferred_language=self.preferred_language,
@@ -101,7 +102,7 @@ class IncomingTransactionProcessor:
token_symbol=self.token_symbol,
tx_sender_information=sender_information,
timestamp=timestamp,
balance='')
balance=operational_balance)
class OutgoingTransactionProcessor:

View File

@@ -110,7 +110,7 @@ def validate_phone_number(phone: str):
def validate_response_type(processor_response: str) -> bool:
"""
"""1*3443*3443*Philip*Wanga*1*Juja*Software Developer*2*3
This function checks the prefix for a corresponding menu's text from the response offered by the Ussd Processor and
determines whether the response should prompt the end of a ussd session or the
:param processor_response: A ussd menu's text value.

View File

@@ -11,8 +11,12 @@ RUN apk update && \
apk add git linux-headers postgresql-dev gnupg bash
RUN apk add --update musl-dev gcc libffi-dev
# create secrets directory
RUN mkdir -vp pgp/keys
# create application directory
RUN mkdir -vp cic-ussd
RUN mkdir -vp data
COPY cic-ussd/setup.cfg \
cic-ussd/setup.py \
@@ -28,6 +32,7 @@ RUN cd cic-ussd && \
# copy all necessary files
COPY cic-ussd/cic_ussd/ cic-ussd/cic_ussd/
COPY cic-ussd/cic_ussd/db/ussd_menu.json data/
COPY cic-ussd/scripts/ cic-ussd/scripts/
COPY cic-ussd/states/ cic-ussd/states/
COPY cic-ussd/transitions/ cic-ussd/transitions/

View File

@@ -6,10 +6,12 @@ betterpath==0.2.2
billiard==3.6.3.0
celery==4.4.7
cffi==1.14.3
cic-eth~=0.10.0a22
chainlib~=0.0.1a15
cic-eth==0.10.0a38
cic-notify==0.3.1
cic-types==0.1.0a8
click==7.1.2
confini~=0.3.6a1
confini==0.3.5
cryptography==3.2.1
faker==4.17.1
iniconfig==1.1.1
@@ -34,6 +36,7 @@ python-i18n==0.3.9
pytz==2020.1
PyYAML==5.3.1
redis==3.5.3
requests==2.24.0
semver==2.13.0
six==1.15.0
SQLAlchemy==1.3.20

View File

@@ -33,6 +33,7 @@ packages =
cic_ussd.db.models
cic_ussd.files
cic_ussd.menu
cic_ussd.metadata
cic_ussd.runnable
cic_ussd.session
cic_ussd.state_machine

View File

@@ -1,10 +1,13 @@
[
"account_management",
"profile_management",
"metadata_management",
"select_preferred_language",
"enter_current_pin",
"mini_statement_inquiry_pin_authorization",
"enter_new_pin",
"new_pin_confirmation",
"display_user_profile_data"
"display_user_metadata",
"standard_pin_authorization",
"account_balances_pin_authorization",
"account_statement_pin_authorization",
"account_balances"
]

View File

@@ -0,0 +1,5 @@
[
"first_transaction_set",
"middle_transaction_set",
"last_transaction_set"
]

View File

@@ -0,0 +1,8 @@
[
"enter_given_name",
"enter_family_name",
"enter_gender",
"enter_age",
"enter_location",
"enter_products"
]

View File

@@ -1,8 +0,0 @@
[
"enter_first_name",
"enter_last_name",
"enter_gender",
"enter_location",
"enter_business_profile",
"view_profile_pin_authorization"
]

View File

@@ -3,4 +3,5 @@ pytest-alembic==0.2.5
pytest-celery==0.0.0a1
pytest-cov==2.10.1
pytest-mock==3.3.1
pytest-redis==2.0.0
pytest-redis==2.0.0
requests-mock==1.8.0

View File

@@ -0,0 +1,30 @@
# standard imports
import os
# third-party imports
# local imports
from cic_ussd.db import dsn_from_config
def test_dsn_from_config(load_config):
"""
"""
# test dsn for sqlite engine
dsn = dsn_from_config(load_config)
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
assert dsn == f'{scheme}:///{load_config.get("DATABASE_NAME")}'
# test dsn for other db formats
overrides = {
'DATABASE_PASSWORD': 'password',
'DATABASE_DRIVER': 'psycopg2',
'DATABASE_ENGINE': 'postgresql'
}
load_config.dict_override(dct=overrides, dct_description='Override values to test different db formats.')
scheme = f'{load_config.get("DATABASE_ENGINE")}+{load_config.get("DATABASE_DRIVER")}'
dsn = dsn_from_config(load_config)
assert dsn == f"{scheme}://{load_config.get('DATABASE_USER')}:{load_config.get('DATABASE_PASSWORD')}@{load_config.get('DATABASE_HOST')}:{load_config.get('DATABASE_PORT')}/{load_config.get('DATABASE_NAME')}"

View File

@@ -0,0 +1,80 @@
# standard imports
import json
# third-party imports
import pytest
import requests
import requests_mock
# local imports
from cic_ussd.error import UnsupportedMethodError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer, make_request
def test_make_request(define_metadata_pointer_url, mock_meta_get_response, mock_meta_post_response, person_metadata):
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=200,
reason='OK',
content=json.dumps(mock_meta_get_response).encode('utf-8')
)
response = make_request(method='GET', url=define_metadata_pointer_url)
assert response.content == requests.get(define_metadata_pointer_url).content
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=201,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
response = make_request(
method='POST',
url=define_metadata_pointer_url,
data=json.dumps(person_metadata).encode('utf-8'),
headers={
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
)
assert response.content == requests.post(define_metadata_pointer_url).content
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
response = make_request(
method='PUT',
url=define_metadata_pointer_url,
data=json.dumps(person_metadata).encode('utf-8'),
headers={
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
)
assert response.content == requests.put(define_metadata_pointer_url).content
with pytest.raises(UnsupportedMethodError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'DELETE',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
make_request(
method='DELETE',
url=define_metadata_pointer_url
)
assert str(error.value) == 'Unsupported method: DELETE'
def test_blockchain_address_to_metadata_pointer(create_activated_user):
blockchain_address = create_activated_user.blockchain_address
assert type(blockchain_address_to_metadata_pointer(blockchain_address)) == bytes

View File

@@ -0,0 +1,34 @@
# standard imports
import shutil
# third-party imports
# local imports
from cic_ussd.metadata.signer import Signer
def test_client(load_config, setup_metadata_signer, person_metadata):
signer = Signer()
# get gpg used
digest = 'a4337bc45a8fc544c03f52dc550cd6e1e87021bc896588bd79e901e2'
person_metadata['digest'] = digest
gpg = signer.gpg
# check that key data was loaded
assert signer.key_data is not None
# check that correct operational key is returned
gpg.import_keys(key_data=signer.key_data)
gpg_keys = gpg.list_keys()
assert signer.get_operational_key() == gpg_keys[0]
# check that correct signature is returned
key_id = signer.get_operational_key().get('keyid')
signature = gpg.sign(message=digest, passphrase=load_config.get('KEYS_PASSPHRASE'), keyid=key_id)
assert str(signature) == signer.sign_digest(data=person_metadata)
# remove tmp gpg file
shutil.rmtree(Signer.gpg_path)

View File

@@ -0,0 +1,123 @@
# standard imports
import json
# third-party imports
import pytest
import requests_mock
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import get_cached_data
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
UserMetadata.base_url = load_config.get('CIC_META_URL')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
assert user_metadata_client.url == define_metadata_pointer_url
def test_create_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=201,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
user_metadata_client.create(data=person_metadata)
assert 'Get signed material response status: 201' in caplog.text
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'POST',
define_metadata_pointer_url,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.create(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_edit_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=200,
reason='OK'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
assert 'Signed content submission status: 200' in caplog.text
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
define_metadata_pointer_url,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_get_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
init_redis_cache,
load_config,
person_metadata,
setup_metadata_signer):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=200,
content=json.dumps(person_metadata).encode('utf-8'),
reason='OK'
)
user_metadata_client.query()
assert 'Get latest data status: 200' in caplog.text
key = generate_metadata_pointer(
identifier=identifier,
cic_type='cic.person'
)
cached_user_metadata = get_cached_data(key=key)
assert cached_user_metadata
with pytest.raises(RuntimeError) as error:
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
define_metadata_pointer_url,
status_code=404,
reason='NOT FOUND'
)
user_metadata_client.query()
assert 'The data is not available and might need to be added.' in caplog.text
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'

View File

@@ -5,7 +5,6 @@ import json
import pytest
# local imports
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.state_machine.logic.transaction import (has_sufficient_balance,
is_valid_recipient,
is_valid_transaction_amount,
@@ -100,8 +99,8 @@ def test_process_transaction_request(create_valid_tx_recipient,
create_valid_tx_sender,
load_config,
mock_outgoing_transactions,
setup_chain_spec,
ussd_session_data):
UssdStateMachine.chain_str = load_config.get('CIC_CHAIN_SPEC')
ussd_session_data['session_data'] = {
'recipient_phone_number': create_valid_tx_recipient.phone_number,
'transaction_amount': '50'

View File

@@ -0,0 +1,155 @@
# standard imports
import json
# third-party-imports
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.redis import InMemoryStore
from cic_ussd.state_machine.logic.user import (
change_preferred_language_to_en,
change_preferred_language_to_sw,
edit_user_metadata_attribute,
format_user_metadata,
get_user_metadata,
save_complete_user_metadata,
process_gender_user_input,
save_profile_attribute_to_session_data,
update_account_status_to_active)
def test_change_preferred_language(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
assert create_pending_user.preferred_language is None
change_preferred_language_to_en(state_machine_data)
assert create_pending_user.preferred_language == 'en'
change_preferred_language_to_sw(state_machine_data)
assert create_pending_user.preferred_language == 'sw'
def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_session):
state_machine_data = ('', create_in_db_ussd_session, create_pending_user)
update_account_status_to_active(state_machine_data)
assert create_pending_user.get_account_status() == 'ACTIVE'
@pytest.mark.parametrize("current_state, expected_key, expected_result, user_input", [
("enter_given_name", "given_name", "John", "John"),
("enter_family_name", "family_name", "Doe", "Doe"),
("enter_gender", "gender", "Male", "1"),
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_save_profile_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data')[expected_key] == expected_result
@pytest.mark.parametrize("preferred_language, user_input, expected_gender_value", [
("en", "1", "Male"),
("en", "2", "Female"),
("sw", "1", "Mwanaume"),
("sw", "2", "Mwanamke"),
])
def test_process_gender_user_input(create_activated_user, expected_gender_value, preferred_language, user_input):
create_activated_user.preferred_language = preferred_language
gender = process_gender_user_input(user=create_activated_user, user_input=user_input)
assert gender == expected_gender_value
def test_format_user_metadata(create_activated_user,
complete_user_metadata,
setup_chain_spec):
from cic_types.models.person import Person
formatted_user_metadata = format_user_metadata(metadata=complete_user_metadata, user=create_activated_user)
person = Person()
user_metadata = person.deserialize(metadata=formatted_user_metadata)
assert formatted_user_metadata == user_metadata.serialize()
def test_save_complete_user_metadata(celery_session_worker,
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
ussd_session['session_data'] = complete_user_metadata
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_user_metadata.apply_async')
save_complete_user_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(user_metadata, create_activated_user.blockchain_address),
{},
queue='cic-ussd'
)
def test_edit_user_metadata_attribute(celery_session_worker,
cached_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
init_redis_cache,
mocker,
person_metadata,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
assert person_metadata['location']['area_name'] == 'kayaba'
# appropriately format session
ussd_session['session_data'] = {
'location': 'nairobi'
}
state_machine_data = ('', ussd_session, create_activated_user)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_user_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data=state_machine_data)
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
(create_activated_user.blockchain_address, person_metadata, Chain.spec.engine()),
{},
queue='cic-ussd'
)
def test_get_user_metadata_attribute(celery_session_worker,
create_activated_user,
create_in_redis_ussd_session,
mocker,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_user_metadata.apply_async')
get_user_metadata(state_machine_data=state_machine_data)
mocked_get_metadata.assert_called_with(
(create_activated_user.blockchain_address,),
{},
queue='cic-ussd'
)

View File

@@ -0,0 +1,55 @@
# standard imports
import json
# third-party imports
import pytest
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import cache_data
from cic_ussd.state_machine.logic.validator import (is_valid_name,
is_valid_gender_selection,
has_cached_user_metadata)
@pytest.mark.parametrize("user_input, expected_result", [
("Arya", True),
("1234", False)
])
def test_is_valid_name(create_in_db_ussd_session, create_pending_user, user_input, expected_result):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
result = is_valid_name(state_machine_data=state_machine_data)
assert result is expected_result
def test_has_cached_user_metadata(create_in_db_ussd_session,
create_activated_user,
init_redis_cache,
person_metadata):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = ('', serialized_in_db_ussd_session, create_activated_user)
result = has_cached_user_metadata(state_machine_data=state_machine_data)
assert result is False
# cache metadata
user = create_activated_user
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
cache_data(key=key, data=json.dumps(person_metadata))
result = has_cached_user_metadata(state_machine_data=state_machine_data)
assert result
@pytest.mark.parametrize("user_input, expected_result", [
("1", True),
("2", True),
("3", False)
])
def test_is_valid_gender_selection(create_in_db_ussd_session, create_pending_user, user_input, expected_result):
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_pending_user)
result = is_valid_gender_selection(state_machine_data=state_machine_data)
assert result is expected_result

View File

@@ -10,7 +10,7 @@ import pytest
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.transactions import from_wei
from cic_ussd.conversions import from_wei
logg = logging.getLogger()
@@ -155,6 +155,7 @@ def test_unsuccessful_incoming_transaction_recipient_not_found(celery_session_wo
def test_successful_incoming_transaction_sender_not_found(caplog,
celery_session_worker,
create_valid_tx_recipient,
mock_notifier_api,
successful_incoming_transfer_callback):
result = successful_incoming_transfer_callback.get('RESULT')
param = successful_incoming_transfer_callback.get('PARAM')

View File

@@ -15,7 +15,7 @@ def test_persist_session_to_db_task(
create_in_redis_ussd_session):
external_session_id = ussd_session_data.get('external_session_id')
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd.persist_session_to_db',
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
result = s_persist_session_to_db.apply_async()
@@ -38,7 +38,7 @@ def test_session_not_found_error(
with pytest.raises(SessionNotFoundError) as error:
external_session_id = 'SomeRandomValue'
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd.persist_session_to_db',
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
result = s_persist_session_to_db.apply_async()

View File

@@ -0,0 +1,20 @@
# standard imports
# third-party imports
# local imports
from cic_ussd.balance import BalanceManager
from cic_ussd.chain import Chain
def test_balance_manager(create_valid_tx_recipient, load_config, mocker, setup_chain_spec):
chain_str = Chain.spec.__str__()
balance_manager = BalanceManager(
address=create_valid_tx_recipient.blockchain_address,
chain_str=chain_str,
token_symbol='SRF'
)
balance_manager.get_balances = mocker.MagicMock()
balance_manager.get_balances()
balance_manager.get_balances.assert_called_once()

View File

@@ -18,6 +18,7 @@ def test_send_sms_notification(celery_session_worker,
recipient,
set_locale_files,
mock_notifier_api):
notifier = Notifier()
notifier.queue = None
@@ -27,3 +28,9 @@ def test_send_sms_notification(celery_session_worker,
assert messages[0].get('message') == expected_message
assert messages[0].get('recipient') == recipient

View File

@@ -6,6 +6,7 @@ import uuid
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.operations import (add_tasks_to_tracker,
@@ -17,13 +18,12 @@ from cic_ussd.operations import (add_tasks_to_tracker,
get_latest_input,
initiate_account_creation_request,
process_current_menu,
process_phone_number,
process_menu_interaction_requests,
cache_account_creation_task_id,
get_user_by_phone_number,
reset_pin,
update_ussd_session,
save_to_in_memory_ussd_session_data)
from cic_ussd.phone_number import get_user_by_phone_number,process_phone_number
from cic_ussd.transactions import truncate
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
@@ -99,6 +99,7 @@ def test_initiate_account_creation_request(account_creation_action_data,
load_config,
load_ussd_menu,
mocker,
setup_chain_spec,
set_locale_files,
ussd_session_data):
external_session_id = ussd_session_data.get('external_session_id')
@@ -112,7 +113,8 @@ def test_initiate_account_creation_request(account_creation_action_data,
mocked_cache_function = mocker.patch('cic_ussd.operations.cache_account_creation_task_id')
mocked_cache_function(phone_number, task_id)
response = initiate_account_creation_request(chain_str=load_config.get('CIC_CHAIN_SPEC'),
chain_str = Chain.spec.__str__()
response = initiate_account_creation_request(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=ussd_session_data.get('msisdn'),
service_code=ussd_session_data.get('service_code'),
@@ -204,11 +206,13 @@ def test_process_menu_interaction_requests(external_session_id,
load_ussd_menu,
load_data_into_state_machine,
load_config,
setup_chain_spec,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session):
chain_str = Chain.spec.__str__()
response = process_menu_interaction_requests(
chain_str=load_config.get('CIC_CHAIN_SPEC'),
chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
queue='cic-ussd',

View File

@@ -12,7 +12,7 @@ from cic_ussd.processor import (custom_display_text,
def test_process_pin_authorization(create_activated_user,
load_ussd_menu,
set_locale_files):
ussd_menu = UssdMenu.find_by_name(name='name_management_pin_authorization')
ussd_menu = UssdMenu.find_by_name(name='standard_pin_authorization')
response = process_pin_authorization(
display_key=ussd_menu.get('display_key'),
user=create_activated_user

View File

@@ -4,6 +4,7 @@
import pytest
# local imports
from cic_ussd.chain import Chain
from cic_ussd.transactions import OutgoingTransactionProcessor, truncate
@@ -11,8 +12,9 @@ def test_outgoing_transaction_processor(load_config,
create_valid_tx_recipient,
create_valid_tx_sender,
mock_outgoing_transactions):
chain_str = Chain.spec.__str__()
outgoing_tx_processor = OutgoingTransactionProcessor(
chain_str=load_config.get('CIC_CHAIN_SPEC'),
chain_str=chain_str,
from_address=create_valid_tx_sender.blockchain_address,
to_address=create_valid_tx_recipient.blockchain_address
)

View File

@@ -1,3 +1,7 @@
# third-party imports
from cic_types.pytest import *
# local imports
from tests.fixtures.config import *
from tests.fixtures.db import *
@@ -8,41 +12,3 @@ from tests.fixtures.redis import *
from tests.fixtures.callback import *
from tests.fixtures.requests import *
from tests.fixtures.mocks import *

View File

@@ -12,7 +12,8 @@ def celery_includes():
return [
'cic_ussd.tasks.ussd',
'cic_ussd.tasks.callback_handler',
'cic_notify.tasks.sms'
'cic_notify.tasks.sms',
'cic_ussd.tasks.metadata'
]

Some files were not shown because too many files have changed in this diff Show More