Compare commits

..

34 Commits

Author SHA1 Message Date
46de0401ec add a health check to ussd 2021-08-09 11:43:47 -07:00
b7942ddcfa Merge branch 'philip/add-docker-compose-default' into 'master'
Adds password pepper value defaults.

See merge request grassrootseconomics/cic-internal-integration!242
2021-08-06 16:58:37 +00:00
8de5dc1540 Adds password pepper value defaults. 2021-08-06 19:53:36 +03:00
fad0a4b580 Merge branch 'philip/the-great-bump' into 'master'
The great bump

See merge request grassrootseconomics/cic-internal-integration!239
2021-08-06 16:29:01 +00:00
0672a17d2e The great bump 2021-08-06 16:29:01 +00:00
f764b73f66 Testing a whitespace change for deployment 2021-07-30 17:05:25 +00:00
806b82504f Merge branch 'bvander/fix-notify-migrations' into 'master'
Bvander/fix notify migrations

See merge request grassrootseconomics/cic-internal-integration!238
2021-07-29 18:29:17 +00:00
ac76e14129 Bvander/fix notify migrations 2021-07-29 18:29:17 +00:00
1c78f4d6d6 Merge branch 'bvander/fix-notify-migrations' into 'master'
migrations fix

See merge request grassrootseconomics/cic-internal-integration!237
2021-07-28 23:33:34 +00:00
0d6e228f8a migrations fix 2021-07-28 23:33:34 +00:00
7a3cb7ab75 Merge branch 'philip/notify-fixes' into 'master'
Philip/notify fixes

See merge request grassrootseconomics/cic-internal-integration!234
2021-07-23 15:15:30 +00:00
992c7b4022 Philip/notify fixes 2021-07-23 15:15:30 +00:00
f19173001e Merge branch 'lash/chainlib-nextgen' into 'master'
Implement nextgen chainlib and chainqueue upgrade

See merge request grassrootseconomics/cic-internal-integration!231
2021-07-21 17:34:51 +00:00
Louis Holbrook
f82bb4515d Implement nextgen chainlib and chainqueue upgrade 2021-07-21 17:34:51 +00:00
24e6db7d87 Merge branch 'philip/sms-cluster-issues' into 'master'
Censors sensitive config values.

See merge request grassrootseconomics/cic-internal-integration!209
2021-07-20 16:18:27 +00:00
ecdfb9bc5a Censors sensitive config values. 2021-07-20 16:18:27 +00:00
30415ac997 Merge branch 'bvander/data-seeding-profiles' into 'master'
e2e ussd import user scripts

See merge request grassrootseconomics/cic-internal-integration!230
2021-07-19 21:30:05 +00:00
d5a8b77349 e2e ussd import user scripts 2021-07-19 21:30:04 +00:00
Louis Holbrook
ed2521b582 Merge branch 'lash/migration-fix' into 'master'
Apply config changes to contract migration

See merge request grassrootseconomics/cic-internal-integration!229
2021-07-16 13:21:08 +00:00
Louis Holbrook
395930106a Apply config changes to contract migration 2021-07-16 13:21:07 +00:00
Louis Holbrook
ee1452e530 Merge branch 'lash/fix-ussd-impotrs' into 'master'
Make verify pass in ussd imports

See merge request grassrootseconomics/cic-internal-integration!228
2021-07-16 09:30:23 +00:00
nolash
8cdaf9f28a Fix confini instantiation in ussd 2021-07-15 00:54:48 +02:00
Louis Holbrook
402b968b6d Merge branch 'lash/dispatcher-leak' into 'master'
Fix dispatcher memory leak

See merge request grassrootseconomics/cic-internal-integration!220
2021-07-14 22:02:59 +00:00
Louis Holbrook
aa13517534 Fix dispatcher memory leak 2021-07-14 22:02:59 +00:00
Louis Holbrook
884b18f2f1 Merge branch 'lash/expand-tokens-in-list' into 'master'
Expand token symbol and decimals in tx listings

See merge request grassrootseconomics/cic-internal-integration!226
2021-07-14 15:06:20 +00:00
Louis Holbrook
494a8f3e88 Expand token symbol and decimals in tx listings 2021-07-14 15:06:20 +00:00
Geoff Turk
1214f605a7 Merge branch 'geoffturk/fix-geo-range' into 'master'
Fix out of range error for geolocation in import users script

See merge request grassrootseconomics/cic-internal-integration!225
2021-07-14 14:52:40 +00:00
Louis Holbrook
0783a6001c Merge branch 'lash/eth-abi-exorcism' into 'master'
Remove eth-abi dep in data seeding

See merge request grassrootseconomics/cic-internal-integration!224
2021-07-14 14:43:33 +00:00
Louis Holbrook
f9594b766a Remove eth-abi dep in data seeding 2021-07-14 14:43:32 +00:00
geoffturk
561ae62d5e Fix out of range error for geolocation in import users script 2021-07-13 09:42:53 +00:00
Louis Holbrook
d6782abbcc Merge branch 'lash/default-token-context' into 'master'
Add contextual token symbol and name defaults

Closes #85

See merge request grassrootseconomics/cic-internal-integration!219
2021-07-12 19:50:48 +00:00
Louis Holbrook
8f173fa30b Add contextual token symbol and name defaults 2021-07-12 19:50:48 +00:00
Louis Holbrook
3741cb3283 Merge branch 'lash/account-registry-filter-match' into 'master'
Only match account registry on correct contract address

Closes cic-eth#127

See merge request grassrootseconomics/cic-internal-integration!218
2021-07-12 19:50:32 +00:00
Louis Holbrook
54dd5acb62 Only match account registry on correct contract address 2021-07-12 19:50:32 +00:00
271 changed files with 6682 additions and 7987 deletions

View File

@@ -100,3 +100,4 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -1,13 +1,13 @@
cic-base==0.1.3a3+build.984b5cff
cic-base~=0.2.0a4
alembic==1.4.2
confini~=0.3.6rc3
confini>=0.3.6rc3,<0.5.0
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.6a1
cic-eth-registry~=0.5.6a2
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.3a3
erc20-faucet~=0.2.2a1
chainsyncer[sql]~=0.0.3a5
erc20-faucet~=0.2.2a2

View File

@@ -1,5 +1,5 @@
SQLAlchemy==1.3.20
cic-eth-registry~=0.5.6a1
cic-eth-registry>=0.5.6a2,<0.6.0
hexathon~=0.0.1a7
chainqueue~=0.0.2b5
eth-erc20==0.0.10a2
chainqueue>=0.0.3a1,<0.1.0
eth-erc20>=0.0.10a3,<0.1.0

View File

@@ -6,6 +6,11 @@ import logging
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports
from cic_eth.db.enum import LockEnum
@@ -19,6 +24,12 @@ from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
def normalize_address(a):
if a == None:
return None
return add_0x(hex_uniform(strip_0x(a)))
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
@@ -32,6 +43,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -53,6 +65,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -72,6 +85,7 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
@@ -89,6 +103,7 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
@@ -106,6 +121,7 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=Non
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
@@ -123,6 +139,7 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
@@ -131,6 +148,7 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
address = normalize_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))

View File

@@ -14,7 +14,11 @@ from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from potaahto.symbols import snake_and_camel
# local imports
@@ -69,15 +73,17 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
query_address = add_0x(hex_uniform(strip_0x(address))) # aaaaargh
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==address)
q = q.filter(TxCache.sender==query_address)
q = q.filter(Otx.nonce>=nonce+delta)
q = q.order_by(Otx.nonce.asc())
otxs = q.all()
tx_hashes = []
txs = []
gas_total = 0
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
@@ -89,8 +95,10 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
gas_total += tx_new['gas_price'] * tx_new['gas']
logg.debug('tx_new {}'.format(tx_new))
logg.debug('gas running total {}'.format(gas_total))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
@@ -122,8 +130,10 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s = create_check_gas_task(
txs,
chain_spec,
tx_new['from'],
gas=tx_new['gas'],
#tx_new['from'],
address,
#gas=tx_new['gas'],
gas=gas_total,
tx_hashes_hex=tx_hashes,
queue=queue,
)
@@ -132,7 +142,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
tx_new['from'],
address,
#tx_new['from'],
],
queue=queue,
)
@@ -140,7 +151,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
tx_new['from'],
address,
#tx_new['from'],
],
queue=queue,
)

View File

@@ -21,6 +21,7 @@ from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
@@ -307,6 +308,8 @@ class AdminApi:
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
"""
address = add_0x(hex_uniform(strip_0x(address)))
last_nonce = -1
s = celery.signature(
'cic_eth.queue.query.get_account_tx',

View File

@@ -8,7 +8,8 @@ Create Date: 2021-04-02 18:30:55.398388
from alembic import op
import sqlalchemy as sa
from chainqueue.db.migrations.sqlalchemy import (
#from chainqueue.db.migrations.sqlalchemy import (
from chainqueue.db.migrations.default.export import (
chainqueue_upgrade,
chainqueue_downgrade,
)

View File

@@ -8,7 +8,8 @@ Create Date: 2021-04-02 18:36:44.459603
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
#from chainsyncer.db.migrations.sqlalchemy import (
from chainsyncer.db.migrations.default.export import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)

View File

@@ -126,3 +126,4 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -23,7 +23,7 @@ from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountsIndex
from sarafu_faucet import MinterFaucet
from chainqueue.db.models.tx import TxCache
from chainqueue.sql.tx import cache_tx_dict
# local import
from cic_eth_registry import CICRegistry
@@ -300,20 +300,17 @@ def cache_gift_data(
session = self.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,
)
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'from_value': 0,
'to_value': 0,
}
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)
@@ -342,18 +339,15 @@ def cache_account_data(
tx_data = AccountsIndex.parse_add_request(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'from_value': 0,
'to_value': 0,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -1,385 +0,0 @@
# standard imports
import os
import logging
# third-party imports
import celery
import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db import SessionBase
from cic_eth.db.models.convert import TxConvertTransfer
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import TokenTxFactory
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.rpc import RpcClient
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
logg = logging.getLogger()
contract_function_signatures = {
'convert': 'f3898a97',
'convert2': '569706eb',
}
class BancorTxFactory(TxFactory):
"""Factory for creating Bancor network transactions.
"""
def convert(
self,
source_token_address,
destination_token_address,
reserve_address,
source_amount,
minimum_return,
chain_spec,
fee_beneficiary='0x0000000000000000000000000000000000000000',
fee_ppm=0,
):
"""Create a BancorNetwork "convert" transaction.
:param source_token_address: ERC20 contract address for token to convert from
:type source_token_address: str, 0x-hex
:param destination_token_address: ERC20 contract address for token to convert to
:type destination_token_address: str, 0x-hex
:param reserve_address: ERC20 contract address of Common reserve token
:type reserve_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum amount of destination tokens to accept as result for conversion
:type source_amount: int
:return: Unsigned "convert" transaction in standard Ethereum format
:rtype: dict
"""
network_contract = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
network_gas = network_contract.gas('convert')
tx_convert_buildable = network_contract.contract.functions.convert2(
[
source_token_address,
source_token_address,
reserve_address,
destination_token_address,
destination_token_address,
],
source_amount,
minimum_return,
fee_beneficiary,
fee_ppm,
)
tx_convert = tx_convert_buildable.buildTransaction({
'from': self.address,
'gas': network_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_convert
def unpack_convert(data):
f = data[2:10]
if f != contract_function_signatures['convert2']:
raise ValueError('Invalid convert data ({})'.format(f))
d = data[10:]
path = d[384:]
source = path[64-40:64]
destination = path[-40:]
amount = int(d[64:128], 16)
min_return = int(d[128:192], 16)
fee_recipient = d[192:256]
fee = int(d[256:320], 16)
return {
'amount': amount,
'min_return': min_return,
'source_token': web3.Web3.toChecksumAddress('0x' + source),
'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
'fee_recipient': fee_recipient,
'fee': fee,
}
# Kept for historical reference, it unpacks a convert call without fee parameters
#def _unpack_convert_mint(data):
# f = data[2:10]
# if f != contract_function_signatures['convert2']:
# raise ValueError('Invalid convert data ({})'.format(f))
#
# d = data[10:]
# path = d[256:]
# source = path[64-40:64]
# destination = path[-40:]
#
# amount = int(d[64:128], 16)
# min_return = int(d[128:192], 16)
# return {
# 'amount': amount,
# 'min_return': min_return,
# 'source_token': web3.Web3.toChecksumAddress('0x' + source),
# 'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
# }
@celery_app.task(bind=True)
def convert_with_default_reserve(self, tokens, from_address, source_amount, minimum_return, to_address, chain_str):
"""Performs a conversion between two liquid tokens using Bancor network.
:param tokens: Token pair, source and destination respectively
:type tokens: list of str, 0x-hex
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum about of destination tokens to receive
:type minimum_return: int
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=from_address)
cr = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
source_token = CICRegistry.get_address(chain_spec, tokens[0]['address'])
reserve_address = CICRegistry.get_contract(chain_spec, 'BNTToken', 'ERC20').address()
tx_factory = TokenTxFactory(from_address, c)
tx_approve_zero = tx_factory.approve(source_token.address(), cr.address(), 0, chain_spec)
(tx_approve_zero_hash_hex, tx_approve_zero_signed_hex) = sign_and_register_tx(tx_approve_zero, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_approve = tx_factory.approve(source_token.address(), cr.address(), source_amount, chain_spec)
(tx_approve_hash_hex, tx_approve_signed_hex) = sign_and_register_tx(tx_approve, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_factory = BancorTxFactory(from_address, c)
tx_convert = tx_factory.convert(
tokens[0]['address'],
tokens[1]['address'],
reserve_address,
source_amount,
minimum_return,
chain_spec,
)
(tx_convert_hash_hex, tx_convert_signed_hex) = sign_and_register_tx(tx_convert, chain_str, queue, 'cic_eth.eth.bancor.otx_cache_convert')
# TODO: consider moving save recipient to async task / chain it before the tx send
if to_address != None:
save_convert_recipient(tx_convert_hash_hex, to_address, chain_str)
s = create_check_gas_and_send_task(
[tx_approve_zero_signed_hex, tx_approve_signed_hex, tx_convert_signed_hex],
chain_str,
from_address,
tx_approve_zero['gasPrice'] * tx_approve_zero['gas'],
tx_hashes_hex=[tx_approve_hash_hex],
queue=queue,
)
s.apply_async()
return tx_convert_hash_hex
#@celery_app.task()
#def process_approval(tx_hash_hex):
# t = session.query(TxConvertTransfer).query(TxConvertTransfer.approve_tx_hash==tx_hash_hex).first()
# c = session.query(Otx).query(Otx.tx_hash==t.convert_tx_hash)
# gas_limit = 8000000
# gas_price = GasOracle.gas_price()
#
# # TODO: use celery group instead
# s_queue = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# nonce,
# c['address'], # TODO: check that this is in fact sender address
# c['tx_hash'],
# c['signed_tx'],
# ]
# )
# s_queue.apply_async()
#
# s_check_gas = celery.signature(
# 'cic_eth.eth.gas.check_gas',
# [
# c['address'],
# [c['signed_tx']],
# gas_limit * gas_price,
# ]
# )
# s_send = celery.signature(
# 'cic_eth.eth.tx.send',
# [],
# )
#
# s_set_sent = celery.signature(
# 'cic_eth.queue.state.set_sent',
# [False],
# )
# s_send.link(s_set_sent)
# s_check_gas.link(s_send)
# s_check_gas.apply_async()
# return tx_hash_hex
@celery_app.task()
def save_convert_recipient(convert_hash, recipient_address, chain_str):
"""Registers the recipient target for a convert-and-transfer operation.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param recipient_address: Address of consequtive transfer recipient
:type recipient_address: str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer(convert_hash, recipient_address, chain_str)
session.add(t)
session.commit()
session.close()
@celery_app.task()
def save_convert_transfer(convert_hash, transfer_hash):
"""Registers that the transfer part of a convert-and-transfer operation has been executed.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param convert_hash: Transaction hash of transfer operation
:type convert_hash: str, 0x-hex
:returns: transfer_hash,
:rtype: list, single str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer.get(convert_hash)
t.transfer(transfer_hash)
session.add(t)
session.commit()
session.close()
return [transfer_hash]
# TODO: seems unused, consider removing
@celery_app.task()
def resolve_converters_by_tokens(tokens, chain_str):
"""Return converters for a list of tokens.
:param tokens: Token addresses to look up
:type tokens: list of str, 0x-hex
:return: Addresses of matching converters
:rtype: list of str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
for t in tokens:
c = CICRegistry.get_contract(chain_spec, 'ConverterRegistry')
fn = c.function('getConvertersByAnchors')
try:
converters = fn([t['address']]).call()
except Exception as e:
raise e
t['converters'] = converters
return tokens
@celery_app.task(bind=True)
def transfer_converted(self, tokens, holder_address, receiver_address, value, tx_convert_hash_hex, chain_str):
"""Execute the ERC20 transfer of a convert-and-transfer operation.
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param holder_address: Token receiver address
:type holder_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:return: Transaction hash
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=holder_address)
# get transaction parameters
gas_price = c.gas_price()
tx_factory = TokenTxFactory(holder_address, c)
token_address = tokens[0]['address']
tx_transfer = tx_factory.transfer(
token_address,
receiver_address,
value,
chain_spec,
)
(tx_transfer_hash_hex, tx_transfer_signed_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.token.otx_cache_transfer')
# send transaction
logg.info('transfer converted token {} from {} to {} value {} {}'.format(token_address, holder_address, receiver_address, value, tx_transfer_signed_hex))
s = create_check_gas_and_send_task(
[tx_transfer_signed_hex],
chain_str,
holder_address,
tx_transfer['gasPrice'] * tx_transfer['gas'],
None,
queue,
)
s_save = celery.signature(
'cic_eth.eth.bancor.save_convert_transfer',
[
tx_convert_hash_hex,
tx_transfer_hash_hex,
],
queue=queue,
)
s_save.link(s)
s_save.apply_async()
return tx_transfer_hash_hex
@celery_app.task()
def otx_cache_convert(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data))
session = TxCache.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['from'],
tx_data['source_token'],
tx_data['destination_token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
session.close()
return tx_hash_hex

View File

@@ -13,9 +13,9 @@ from chainlib.eth.tx import (
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20
from chainqueue.sql.tx import cache_tx_dict
# local imports
from cic_eth.db.models.base import SessionBase
@@ -375,19 +375,16 @@ def cache_transfer_data(
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)
@@ -417,19 +414,16 @@ def cache_transfer_from_data(
token_value = tx_data[2]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)
@@ -458,19 +452,16 @@ def cache_approve_data(
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -9,6 +9,7 @@ from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits
from chainqueue.sql.tx import cache_tx_dict
from chainlib.eth.gas import (
balance,
price,
@@ -133,20 +134,17 @@ def cache_gas_data(
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'from_value': tx['value'],
'to_value': tx['value'],
}
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -18,7 +18,6 @@ from hexathon import (
strip_0x,
)
from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError
from potaahto.symbols import snake_and_camel

View File

@@ -14,9 +14,11 @@ from chainlib.eth.tx import (
)
from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache
@@ -114,9 +116,6 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address)
token = ERC20Token(chain_spec, rpc, tx['to'])
token_symbol = token.symbol
token_decimals = token.decimals
times = tx_times(tx['hash'], chain_spec)
tx_r = {
'hash': tx['hash'],
@@ -126,12 +125,6 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
'destination_value': tx_token_value,
'source_token': tx['to'],
'destination_token': tx['to'],
'source_token_symbol': token_symbol,
'destination_token_symbol': token_symbol,
'source_token_decimals': token_decimals,
'destination_token_decimals': token_decimals,
'source_token_chain': chain_str,
'destination_token_chain': chain_str,
'nonce': tx['nonce'],
}
if times['queue'] != None:
@@ -146,8 +139,8 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: Surely it must be possible to optimize this
# TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
@celery_app.task()
def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
@celery_app.task(bind=True, base=BaseTask)
def tx_collate(self, tx_batches, chain_spec_dict, offset, limit, newest_first=True, verify_contracts=True):
"""Merges transaction data from multiple sources and sorts them in chronological order.
:param tx_batches: Transaction data inputs
@@ -196,6 +189,32 @@ def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
if newest_first:
ks.reverse()
for k in ks:
txs.append(txs_by_block[k])
tx = txs_by_block[k]
if verify_contracts:
try:
tx = verify_and_expand(tx, chain_spec, sender_address=BaseTask.call_address)
except UnknownContractError:
logg.error('verify failed on tx {}, skipping'.format(tx['hash']))
continue
txs.append(tx)
return txs
def verify_and_expand(tx, chain_spec, sender_address=ZERO_ADDRESS):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
if tx.get('source_token_symbol') == None and tx['source_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['source_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['source_token'])
tx['source_token_symbol'] = token.symbol
tx['source_token_decimals'] = token.decimals
if tx.get('destination_token_symbol') == None and tx['destination_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['destination_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['destination_token'])
tx['destination_token_symbol'] = token.symbol
tx['destination_token_decimals'] = token.decimals
return tx

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
return dsn

View File

@@ -100,6 +100,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status))

View File

@@ -80,7 +80,12 @@ def main():
t = api.create_account(register=register)
ps.get_message()
o = ps.get_message(timeout=args.timeout)
try:
o = ps.get_message(timeout=args.timeout)
except TimeoutError as e:
sys.stderr.write('got no new address from cic-eth before timeout: {}\n'.format(e))
sys.exit(1)
ps.unsubscribe()
m = json.loads(o['data'])
print(m['result'])

View File

@@ -90,6 +90,7 @@ class DispatchSyncer:
def __init__(self, chain_spec):
self.chain_spec = chain_spec
self.session = None
def chain(self):
@@ -100,16 +101,18 @@ class DispatchSyncer:
c = len(txs.keys())
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
chain_str = str(self.chain_spec)
session = SessionBase.create_session()
self.session = SessionBase.create_session()
for k in txs.keys():
tx_raw = txs[k]
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec)
try:
set_reserved(self.chain_spec, tx['hash'], session=session)
set_reserved(self.chain_spec, tx['hash'], session=self.session)
self.session.commit()
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
self.session.rollback()
continue
s_check = celery.signature(
@@ -132,16 +135,25 @@ class DispatchSyncer:
s_check.link(s_send)
t = s_check.apply_async()
logg.info('processed {}'.format(k))
self.session.close()
self.session = None
def loop(self, w3, interval):
def loop(self, interval):
while run:
txs = {}
typ = StatusBits.QUEUED
utxs = get_upcoming_tx(self.chain_spec, typ)
for k in utxs.keys():
txs[k] = utxs[k]
self.process(w3, txs)
try:
conn = RPCConnection.connect(self.chain_spec, 'default')
self.process(conn, txs)
except ConnectionError as e:
if self.session != None:
self.session.close()
self.session = None
logg.error('connection to node failed: {}'.format(e))
if len(utxs) > 0:
time.sleep(self.yield_delay)
@@ -151,8 +163,7 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default')
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0)

View File

@@ -11,6 +11,7 @@ from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx
from chainlib.eth.address import to_checksum_address
# local imports
from cic_eth.db.models.base import SessionBase
@@ -47,12 +48,13 @@ class GasFilter(SyncFilter):
SessionBase.release_session(session)
address = to_checksum_address(r[0])
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
s = create_check_gas_task(
list(txs.values()),
self.chain_spec,
r[0],
address,
0,
tx_hashes_hex=list(txs.keys()),
queue=self.queue,

View File

@@ -12,20 +12,24 @@ from hexathon import (
# local imports
from .base import SyncFilter
logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger(__name__)
account_registry_add_log_hash = '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430'
class RegistrationFilter(SyncFilter):
def __init__(self, chain_spec, queue):
def __init__(self, chain_spec, contract_address, queue=None):
self.chain_spec = chain_spec
self.queue = queue
self.contract_address = contract_address
def filter(self, conn, block, tx, db_session=None):
registered_address = None
if self.contract_address != tx.inputs[0]:
logg.debug('not an account registry tx; {} != {}'.format(self.contract_address, tx.inputs[0]))
return None
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:

View File

@@ -78,6 +78,14 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
rpc = RPCConnection.connect(chain_spec, 'default')
registry = None
try:
registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def main():
@@ -85,7 +93,6 @@ def main():
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest()
r = rpc.do(o)
@@ -151,7 +158,8 @@ def main():
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
account_registry_address = registry.by_name('AccountRegistry')
registration_filter = RegistrationFilter(chain_spec, account_registry_address, queue=config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))

View File

@@ -9,8 +9,8 @@ import semver
version = (
0,
12,
0,
'alpha.2',
2,
'alpha.3',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=
DEBUG=0

View File

@@ -13,7 +13,7 @@ ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages
# --force-reinstall \
# --extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
# -r requirements.txt
COPY *requirements.txt .
COPY *requirements.txt ./
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY \

View File

@@ -1,3 +1,3 @@
celery==4.4.7
chainlib~=0.0.5a1
chainlib-eth>=0.0.6a1,<0.1.0
semver==2.13.0

View File

@@ -1,15 +1,15 @@
chainsyncer[sql]~=0.0.3a3
chainqueue~=0.0.2b5
chainqueue>=0.0.3a2,<0.1.0
chainsyncer[sql]>=0.0.5a1,<0.1.0
alembic==1.4.2
confini~=0.3.6rc4
confini>=0.3.6rc4,<0.5.0
redis==3.5.3
hexathon~=0.0.1a7
pycryptodome==3.10.1
liveness~=0.0.1a7
eth-address-index~=0.1.2a1
eth-accounts-index~=0.0.12a1
cic-eth-registry~=0.5.6a1
erc20-faucet~=0.2.2a1
erc20-transfer-authorization~=0.3.2a1
sarafu-faucet~=0.0.4a1
eth-address-index>=0.1.3a1,<0.2.0
eth-accounts-index>=0.0.13a1,<0.1.0
cic-eth-registry>=0.5.7a1,<0.6.0
erc20-faucet>=0.2.3a1,<0.3.0
erc20-transfer-authorization>=0.3.3a1,<0.4.0
sarafu-faucet>=0.0.4a5,<0.1.0
moolb~=0.1.1b2

View File

@@ -6,4 +6,4 @@ pytest-redis==2.0.0
redis==3.5.3
eth-tester==0.5.0b3
py-evm==0.3.0a20
eth-erc20~=0.0.10a2
eth-erc20~=0.0.11a1

View File

@@ -18,6 +18,7 @@ def test_filter_bogus(
cic_registry,
contract_roles,
register_lookups,
account_registry,
):
fltrs = [
@@ -26,7 +27,7 @@ def test_filter_bogus(
TxFilter(default_chain_spec, None),
CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER']),
StragglerFilter(default_chain_spec, None),
RegistrationFilter(default_chain_spec, queue=None),
RegistrationFilter(default_chain_spec, account_registry, queue=None),
]
for fltr in fltrs:

View File

@@ -1,3 +1,7 @@
# standard imports
import logging
import os
# external imports
from eth_accounts_index.registry import AccountRegistry
from chainlib.connection import RPCConnection
@@ -14,12 +18,17 @@ from chainlib.eth.block import (
Block,
)
from erc20_faucet import Faucet
from hexathon import strip_0x
from hexathon import (
strip_0x,
add_0x,
)
from chainqueue.sql.query import get_account_tx
# local imports
from cic_eth.runnable.daemons.filters.register import RegistrationFilter
logg = logging.getLogger()
def test_register_filter(
default_chain_spec,
@@ -60,7 +69,11 @@ def test_register_filter(
tx = Tx(tx_src, block=block, rcpt=rcpt)
tx.apply_receipt(rcpt)
fltr = RegistrationFilter(default_chain_spec, queue=None)
fltr = RegistrationFilter(default_chain_spec, add_0x(os.urandom(20).hex()), queue=None)
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert t == None
fltr = RegistrationFilter(default_chain_spec, account_registry, queue=None)
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
t.get_leaf()

View File

@@ -290,6 +290,7 @@ def test_fix_nonce(
txs = get_nonce_tx_cache(default_chain_spec, 3, agent_roles['ALICE'], session=init_database)
ks = txs.keys()
assert len(ks) == 2
for k in ks:
hsh = add_0x(k)
otx = Otx.load(hsh, session=init_database)

View File

@@ -184,7 +184,7 @@ def test_admin_api_account(
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
r = api.account(default_chain_spec, agent_roles['ALICE'])
r = api.account(default_chain_spec, agent_roles['ALICE'], include_sender=True, include_recipient=True)
assert len(r) == 5
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])

View File

@@ -1,4 +1,5 @@
# external imports
import celery
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import (
receipt,
@@ -20,6 +21,7 @@ def test_translate(
cic_registry,
init_celery_tasks,
register_lookups,
celery_session_worker,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
@@ -46,6 +48,20 @@ def test_translate(
'recipient': agent_roles['BOB'],
'recipient_label': None,
}
tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
assert tx['sender_label'] == 'alice'
assert tx['recipient_label'] == 'bob'
#tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
s = celery.signature(
'cic_eth.ext.address.translate_tx_addresses',
[
tx,
[contract_roles['CONTRACT_DEPLOYER']],
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r['sender_label'] == 'alice'
assert r['recipient_label'] == 'bob'

View File

@@ -0,0 +1,92 @@
# external imports
import celery
import pytest
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.gas import (
RPCGasOracle,
)
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.eth.nonce import RPCNonceOracle
from eth_erc20 import ERC20
from hexathon import (
add_0x,
strip_0x,
)
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
def test_ext_tx_collate(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
custodial_roles,
agent_roles,
foo_token,
bar_token,
register_tokens,
cic_registry,
register_lookups,
init_celery_tasks,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(custodial_roles['FOO_TOKEN_GIFTER'], eth_rpc)
gas_oracle = RPCGasOracle(eth_rpc)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
transfer_value_foo = 1000
transfer_value_bar = 1024
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], transfer_value_foo, tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
otx = Otx(
tx['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
init_database.add(otx)
init_database.commit()
txc = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
foo_token,
bar_token,
transfer_value_foo,
transfer_value_bar,
666,
13,
session=init_database,
)
init_database.add(txc)
init_database.commit()
s = celery.signature(
'cic_eth.ext.tx.tx_collate',
[
{tx_hash_hex: tx_signed_raw_hex},
default_chain_spec.asdict(),
0,
100,
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert len(r) == 1
tx = r[0]
assert tx['source_token_symbol'] == 'FOO'
assert tx['source_token_decimals'] == 6
assert tx['destination_token_symbol'] == 'BAR'
assert tx['destination_token_decimals'] == 9

View File

@@ -1,6 +1,7 @@
# third-party imports
# extended imports
import pytest
import uuid
import unittest
# local imports
from cic_eth.db.models.nonce import (
@@ -55,7 +56,7 @@ def test_nonce_reserve(
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(eth_empty_accounts[0], str(uu))
nonce = NonceReservation.release(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit()
assert nonce == (str(uu), 42)

View File

@@ -9,6 +9,11 @@ from chainlib.eth.gas import (
Gas,
)
from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports
from cic_eth.db.enum import LockEnum
@@ -34,7 +39,10 @@ def test_upcoming_with_lock(
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6))
alice_normal = add_0x(hex_uniform(strip_0x(agent_roles['ALICE'])))
bob_normal = add_0x(hex_uniform(strip_0x(agent_roles['BOB'])))
(tx_hash_hex, tx_rpc) = c.create(alice_normal, bob_normal, 100 * (10 ** 6))
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
@@ -43,12 +51,12 @@ def test_upcoming_with_lock(
txs = get_upcoming_tx(default_chain_spec, StatusEnum.PENDING)
assert len(txs.keys()) == 1
Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE'])
Lock.set(str(default_chain_spec), LockEnum.SEND, address=alice_normal)
txs = get_upcoming_tx(default_chain_spec, StatusEnum.PENDING)
txs = get_upcoming_tx(default_chain_spec, status=StatusEnum.PENDING)
assert len(txs.keys()) == 0
(tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6))
(tx_hash_hex, tx_rpc) = c.create(bob_normal, alice_normal, 100 * (10 ** 6))
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)

View File

@@ -1,7 +1,7 @@
crypto-dev-signer~=0.4.14b6
chainqueue~=0.0.2b5
confini~=0.3.6rc4
cic-eth-registry~=0.5.6a1
crypto-dev-signer>=0.4.14b7,<=0.4.14
chainqueue~=0.0.2b6
confini>=0.3.6rc4,<0.5.0
cic-eth-registry>=0.5.7a1,<0.6.0
redis==3.5.3
hexathon~=0.0.1a7
pycryptodome==3.10.1

View File

@@ -34,6 +34,8 @@ elif args.v:
config = confini.Config(args.c, args.env_prefix)
config.process()
config.add(args.q, '_CELERY_QUEUE', True)
config.censor('API_KEY', 'AFRICASTALKING')
config.censor('API_USERNAME', 'AFRICASTALKING')
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))

View File

@@ -9,7 +9,7 @@ import semver
logg = logging.getLogger()
version = (0, 4, 0, 'alpha.7')
version = (0, 4, 0, 'alpha.10')
version_object = semver.VersionInfo(
major=version[0],

View File

@@ -12,14 +12,11 @@ RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
COPY . .
RUN python setup.py install
# TODO please review..can this go into requirements?
RUN pip install $pip_extra_index_url_flag .[africastalking,notifylog]
COPY docker/*.sh .
RUN chmod +x *.sh
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
@@ -27,4 +24,5 @@ COPY docker/*.sh .
COPY .config/ /usr/local/etc/cic-notify/
COPY cic_notify/db/migrations/ /usr/local/share/cic-notify/alembic/
ENTRYPOINT []

View File

@@ -11,14 +11,12 @@ RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
COPY . .
RUN python setup.py install
# TODO please review..can this go into requirements?
RUN pip install $pip_extra_index_url_flag .[africastalking,notifylog]
COPY docker/*.sh .
RUN chmod +x *.sh
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables

View File

@@ -1,3 +1,3 @@
#!/bin/bash
migrate.py -c /usr/local/etc/cic-notify --migrations-dir /usr/local/share/cic-notify/alembic -vv
set -e
python scripts/migrate.py -c /usr/local/etc/cic-notify --migrations-dir /usr/local/share/cic-notify/alembic -vv

View File

@@ -1,5 +1,5 @@
#!/bin/bash
. ./db.sh
. /root/db.sh
/usr/local/bin/cic-notify-tasker -vv $@

View File

@@ -1 +1,7 @@
cic_base[full_graph]==0.1.3a3+build.984b5cff
confini~=0.4.1a1
africastalking==1.2.3
SQLAlchemy==1.3.20
alembic==1.4.2
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3

View File

@@ -35,9 +35,10 @@ elif args.v:
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('API_KEY', 'AFRICASTALKING')
config.censor('API_USERNAME', 'AFRICASTALKING')
config.censor('PASSWORD', 'DATABASE')
#config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrations_dir):

View File

@@ -29,18 +29,11 @@ packages =
cic_notify.db
cic_notify.db.models
cic_notify.ext
cic_notify.tasks
cic_notify.tasks.sms
cic_notify.runnable
scripts =
scripts/migrate.py
[options.extras_require]
africastalking = africastalking==1.2.3
notifylog = psycopg2==2.8.6
testing =
pytest==6.0.1
pytest-celery==0.0.0a1
pytest-mock==3.3.1
pysqlite3==0.4.3
./scripts/migrate.py
[options.entry_points]
console_scripts =

View File

@@ -2,3 +2,4 @@ pytest~=6.0.1
pytest-celery~=0.0.0a1
pytest-mock~=3.3.1
pysqlite3~=0.4.3
pytest-cov==2.10.1

View File

@@ -1,25 +0,0 @@
[app]
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#,*483*061#,*384*96#
SUPPORT_PHONE_NUMBER=0757628885
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -1,10 +0,0 @@
[database]
NAME=cic_ussd
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1

View File

@@ -1,9 +0,0 @@
[celery]
BROKER_URL=redis://
RESULT_URL=redis://
[redis]
HOSTNAME=redis
PASSWORD=
PORT=6379
DATABASE=0

View File

@@ -1,15 +0,0 @@
[app]
ALLOWED_IP=127.0.0.1
LOCALE_FALLBACK=en
LOCALE_PATH=var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
SUPPORT_PHONE_NUMBER=0757628885
[ussd]
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/

View File

@@ -1,8 +0,0 @@
[database]
NAME=cic_ussd_test
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite

View File

@@ -1,5 +0,0 @@
[pgp]
export_dir = /usr/src/pgp/keys/
keys_path = /usr/src/secrets/
private_keys = privatekeys_meta.asc
passphrase =

View File

@@ -1,9 +0,0 @@
[celery]
BROKER_URL = filesystem://
RESULT_URL = filesystem://
[redis]
HOSTNAME=localhost
PASSWORD=
PORT=6379
DATABASE=0

View File

@@ -31,7 +31,7 @@ test-mr-cic-ussd:
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
- export PYTHONPATH=. && pytest -x --cov=cic_ussd --cov-fail-under=90 --cov-report term-missing tests/cic_ussd
needs: ["build-mr-cic-ussd"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"

View File

@@ -1,49 +0,0 @@
# standard imports
import json
# third-party imports
from cic_eth.api import Api
from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
def define_account_tx_metadata(user: Account):
# get sender metadata
identifier = blockchain_address_to_metadata_pointer(
blockchain_address=user.blockchain_address
)
key = generate_metadata_pointer(
identifier=identifier,
cic_type=':cic.person'
)
account_metadata = get_cached_data(key=key)
if account_metadata:
account_metadata = json.loads(account_metadata)
person = Person()
deserialized_person = person.deserialize(person_data=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
return f'{given_name} {family_name} {phone_number}'
else:
phone_number = user.phone_number
return phone_number
def retrieve_account_statement(blockchain_address: str):
chain_str = Chain.spec.__str__()
cic_eth_api = Api(
chain_str=chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
callback_param=blockchain_address
)
cic_eth_api.list(address=blockchain_address, limit=9)

View File

@@ -0,0 +1,90 @@
# standard imports
import json
import logging
from typing import Optional
# third-party imports
from cic_eth.api import Api
# local imports
from cic_ussd.account.transaction import from_wei
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import CachedDataNotFoundError
logg = logging.getLogger()
def get_balances(address: str,
chain_str: str,
token_symbol: str,
asynchronous: bool = False,
callback_param: any = None,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback') -> Optional[list]:
"""This function queries cic-eth for an account's balances, It provides a means to receive the balance either
asynchronously or synchronously.. It returns a dictionary containing the network, outgoing and incoming balances.
:param address: Ethereum address of an account.
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param asynchronous: Boolean value checking whether to return balances asynchronously.
:type asynchronous: bool
:param callback_param: Data to be sent along with the callback containing balance data.
:type callback_param: any
:param callback_queue:
:type callback_queue:
:param callback_task: A celery task path to which callback data should be sent.
:type callback_task: str
:param token_symbol: ERC20 token symbol of the account whose balance is being queried.
:type token_symbol: str
:return: A list containing balance data if called synchronously. | None
:rtype: list | None
"""
logg.debug(f'retrieving balance for address: {address}')
if asynchronous:
cic_eth_api = Api(
chain_str=chain_str,
callback_queue=callback_queue,
callback_task=callback_task,
callback_param=callback_param
)
cic_eth_api.balance(address=address, token_symbol=token_symbol)
else:
cic_eth_api = Api(chain_str=chain_str)
balance_request_task = cic_eth_api.balance(
address=address,
token_symbol=token_symbol)
return balance_request_task.get()
def calculate_available_balance(balances: dict) -> float:
"""This function calculates an account's balance at a specific point in time by computing the difference from the
outgoing balance and the sum of the incoming and network balances.
:param balances: incoming, network and outgoing balances.
:type balances: dict
:return: Token value of the available balance.
:rtype: float
"""
incoming_balance = balances.get('balance_incoming')
outgoing_balance = balances.get('balance_outgoing')
network_balance = balances.get('balance_network')
available_balance = (network_balance + incoming_balance) - outgoing_balance
return from_wei(value=available_balance)
def get_cached_available_balance(blockchain_address: str) -> float:
"""This function attempts to retrieve balance data from the redis cache.
:param blockchain_address: Ethereum address of an account.
:type blockchain_address: str
:raises CachedDataNotFoundError: No cached balance data could be found.
:return: Operational balance of an account.
:rtype: float
"""
identifier = bytes.fromhex(blockchain_address[2:])
key = cache_data_key(identifier, salt=':cic.balances')
cached_balances = get_cached_data(key=key)
if cached_balances:
return calculate_available_balance(json.loads(cached_balances))
else:
raise CachedDataNotFoundError(f'No cached available balance for address: {blockchain_address}')

View File

@@ -0,0 +1,20 @@
# standard imports
# external imports
# local imports
def gender():
return {
'1': 'male',
'2': 'female',
'3': 'other'
}
def language():
return {
'1': 'en',
'2': 'sw'
}

View File

@@ -0,0 +1,44 @@
# standard imports
import json
import logging
from typing import Optional
# external imports
from chainlib.hash import strip_0x
from cic_types.models.person import Person
# local imports
from cic_ussd.metadata import PreferencesMetadata
logg = logging.getLogger(__name__)
def get_cached_preferred_language(blockchain_address: str) -> Optional[str]:
"""This function retrieves an account's set preferred language from preferences metadata in redis cache.
:param blockchain_address:
:type blockchain_address:
:return: Account's set preferred language | Fallback preferred language.
:rtype: str
"""
identifier = bytes.fromhex(strip_0x(blockchain_address))
preferences_metadata_handler = PreferencesMetadata(identifier)
cached_preferences_metadata = preferences_metadata_handler.get_cached_metadata()
if cached_preferences_metadata:
preferences_metadata = json.loads(cached_preferences_metadata)
return preferences_metadata.get('preferred_language')
return None
def parse_account_metadata(account_metadata: dict) -> str:
"""
:param account_metadata:
:type account_metadata:
:return:
:rtype:
"""
person = Person()
deserialized_person = person.deserialize(person_data=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
return f'{given_name} {family_name} {phone_number}'

View File

@@ -0,0 +1,111 @@
# standard imports
import datetime
import logging
from typing import Optional
# external imports
import celery
from chainlib.hash import strip_0x
from cic_eth.api import Api
# local import
from cic_ussd.account.chain import Chain
from cic_ussd.account.transaction import from_wei
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
def filter_statement_transactions(transaction_list: list) -> list:
"""This function parses a transaction list and removes all transactions that entail interactions with the
zero address as the source transaction.
:param transaction_list: Array containing transaction objects.
:type transaction_list: list
:return: Transactions exclusive of the zero address transactions.
:rtype: list
"""
return [tx for tx in transaction_list if tx.get('source_token') != '0x0000000000000000000000000000000000000000']
def generate(querying_party: str, queue: Optional[str], transaction: dict):
"""
:param querying_party:
:type querying_party:
:param queue:
:type queue:
:param transaction:
:type transaction:
:return:
:rtype:
"""
s_generate_statement = celery.signature(
'cic_ussd.tasks.processor.generate_statement', [querying_party, transaction], queue=queue
)
s_generate_statement.apply_async()
def get_cached_statement(blockchain_address: str) -> bytes:
"""This function retrieves an account's cached record of a specific number of transactions in chronological order.
:param blockchain_address: Bytes representation of the hex value of an account's blockchain address.
:type blockchain_address: bytes
:return: Account's transactions statements.
:rtype: str
"""
identifier = bytes.fromhex(strip_0x(blockchain_address))
key = cache_data_key(identifier=identifier, salt=':cic.statement')
return get_cached_data(key=key)
def parse_statement_transactions(statement: list):
"""This function extracts information for transaction objects loaded from the redis cache and structures the data in
a format that is appropriate for the ussd interface.
:param statement: A list of transaction objects.
:type statement: list
:return:
:rtype:
"""
parsed_transactions = []
for transaction in statement:
action_tag = transaction.get('action_tag')
amount = from_wei(transaction.get('token_value'))
direction_tag = transaction.get('direction_tag')
token_symbol = transaction.get('token_symbol')
metadata_id = transaction.get('metadata_id')
timestamp = datetime.datetime.now().strftime('%d/%m/%y, %H:%M')
transaction_repr = f'{action_tag} {amount} {token_symbol} {direction_tag} {metadata_id} {timestamp}'
parsed_transactions.append(transaction_repr)
return parsed_transactions
def query_statement(blockchain_address: str, limit: int = 9):
"""This function queries cic-eth for a set of chronologically ordered number of transactions associated with
an account.
:param blockchain_address: Ethereum address associated with an account.
:type blockchain_address: str, 0x-hex
:param limit: Number of transactions to be returned.
:type limit: int
"""
logg.debug(f'retrieving balance for address: {blockchain_address}')
chain_str = Chain.spec.__str__()
cic_eth_api = Api(
chain_str=chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.statement_callback',
callback_param=blockchain_address
)
cic_eth_api.list(address=blockchain_address, limit=limit)
def statement_transaction_set(preferred_language: str, transaction_reprs: list):
"""
:param preferred_language:
:type preferred_language:
:param transaction_reprs:
:type transaction_reprs:
:return:
:rtype:
"""
if not transaction_reprs:
return translation_for('helpers.no_transaction_history', preferred_language)
return ''.join(f'{transaction_repr}\n' for transaction_repr in transaction_reprs)

View File

@@ -0,0 +1,61 @@
# standard imports
import json
import logging
from typing import Dict, Optional
# external imports
from cic_eth.api import Api
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.cache import cache_data_key, get_cached_data
from cic_ussd.error import SeppukuError
logg = logging.getLogger(__name__)
def get_cached_default_token(chain_str: str) -> Optional[str]:
"""This function attempts to retrieve the default token's data from the redis cache.
:param chain_str: chain name and network id.
:type chain_str: str
:return:
:rtype:
"""
logg.debug(f'Retrieving default token from cache for chain: {chain_str}')
key = cache_data_key(identifier=chain_str.encode('utf-8'), salt=':cic.default_token_data')
return get_cached_data(key=key)
def get_default_token_symbol():
"""This function attempts to retrieve the default token's symbol from cached default token's data.
:raises SeppukuError: The system should terminate itself because the default token is required for an appropriate
system state.
:return: Default token's symbol.
:rtype: str
"""
chain_str = Chain.spec.__str__()
cached_default_token = get_cached_default_token(chain_str)
if cached_default_token:
default_token_data = json.loads(cached_default_token)
return default_token_data.get('symbol')
else:
logg.warning('Cached default token data not found. Attempting retrieval from default token API')
default_token_data = query_default_token(chain_str)
if default_token_data:
return default_token_data.get('symbol')
else:
raise SeppukuError(f'Could not retrieve default token for: {chain_str}')
def query_default_token(chain_str: str):
"""This function synchronously queries cic-eth for the deployed system's default token.
:param chain_str: Chain name and network id.
:type chain_str: str
:return: Token's data.
:rtype: dict
"""
logg.debug(f'Querying API for default token on chain: {chain_str}')
cic_eth_api = Api(chain_str=chain_str)
default_token_request_task = cic_eth_api.default_token()
return default_token_request_task.get()

View File

@@ -0,0 +1,172 @@
# standard import
import decimal
import logging
from typing import Dict, Tuple
# external import
from cic_eth.api import Api
from sqlalchemy.orm.session import Session
# local import
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import UnknownUssdRecipient
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
def _add_tags(action_tag_key: str, preferred_language: str, direction_tag_key: str, transaction: dict):
""" This function adds action and direction tags to a transaction data object.
:param action_tag_key: Key mapping to a helper entry in the translation files describing an action.
:type action_tag_key: str
:param preferred_language: An account's set preferred language.
:type preferred_language: str
:param direction_tag_key: Key mapping to a helper entry in the translation files describing a transaction's
direction relative to the transaction's subject account.
:type direction_tag_key: str
:param transaction: Parsed transaction data object.
:type transaction: dict
"""
action_tag = translation_for(action_tag_key, preferred_language)
direction_tag = translation_for(direction_tag_key, preferred_language)
transaction['action_tag'] = action_tag
transaction['direction_tag'] = direction_tag
def aux_transaction_data(preferred_language: str, transaction: dict) -> dict:
"""This function adds auxiliary data to a transaction object offering contextual information relative to the
subject account's role in the transaction.
:param preferred_language: An account's set preferred language.
:type preferred_language: str
:param transaction: Parsed transaction data object.
:type transaction: dict
:return: Transaction object with contextual data.
:rtype: dict
"""
role = transaction.get('role')
if role == 'recipient':
_add_tags('helpers.received', preferred_language, 'helpers.from', transaction)
if role == 'sender':
_add_tags('helpers.sent', preferred_language, 'helpers.to', transaction)
return transaction
def from_wei(value: int) -> float:
"""This function converts values in Wei to a token in the cic network.
:param value: Value in Wei
:type value: int
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
return truncate(value=value, decimals=2)
def to_wei(value: int) -> int:
"""This functions converts values from a token in the cic network to Wei.
:param value: Value in SRF
:type value: int
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)
def truncate(value: float, decimals: int):
"""This function truncates a value to a specified number of decimals places.
:param value: The value to be truncated.
:type value: float
:param decimals: The number of decimals for the value to be truncated to
:type decimals: int
:return: The truncated value.
:rtype: int
"""
decimal.getcontext().rounding = decimal.ROUND_DOWN
contextualized_value = decimal.Decimal(value)
return round(contextualized_value, decimals)
def transaction_actors(transaction: dict) -> Tuple[Dict, Dict]:
""" This function parses transaction data into a tuple of transaction data objects representative of
of the source and destination account's involved in a transaction.
:param transaction: Transaction data object.
:type transaction: dict
:return: Recipient and sender transaction data object
:rtype: Tuple[Dict, Dict]
"""
destination_token_symbol = transaction.get('destination_token_symbol')
destination_token_value = transaction.get('destination_token_value') or transaction.get('to_value')
recipient_blockchain_address = transaction.get('recipient')
sender_blockchain_address = transaction.get('sender')
source_token_symbol = transaction.get('source_token_symbol')
source_token_value = transaction.get('source_token_value') or transaction.get('from_value')
recipient_transaction_data = {
"token_symbol": destination_token_symbol,
"token_value": destination_token_value,
"blockchain_address": recipient_blockchain_address,
"role": "recipient",
}
sender_transaction_data = {
"blockchain_address": sender_blockchain_address,
"token_symbol": source_token_symbol,
"token_value": source_token_value,
"role": "sender",
}
return recipient_transaction_data, sender_transaction_data
def validate_transaction_account(session: Session, transaction: dict) -> Account:
"""This function checks whether the blockchain address specified in a parsed transaction object resolves to an
account object in the ussd system.
:param session: Database session object.
:type session: Session
:param transaction: Parsed transaction data object.
:type transaction: dict
:return:
:rtype:
"""
blockchain_address = transaction.get('blockchain_address')
role = transaction.get('role')
session = SessionBase.bind_session(session)
account = session.query(Account).filter_by(blockchain_address=blockchain_address).first()
if not account:
if role == 'recipient':
raise UnknownUssdRecipient(
f'Tx for recipient: {blockchain_address} has no matching account in the system.'
)
if role == 'sender':
logg.warning(f'Tx from sender: {blockchain_address} has no matching account in system.')
SessionBase.release_session(session)
return account
class OutgoingTransaction:
def __init__(self, chain_str: str, from_address: str, to_address: str):
"""
:param chain_str: The chain name and network id.
:type chain_str: str
:param from_address: Ethereum address of the sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of the recipient
:type to_address: str, 0x-hex
"""
self.chain_str = chain_str
self.cic_eth_api = Api(chain_str=chain_str)
self.from_address = from_address
self.to_address = to_address
def transfer(self, amount: int, token_symbol: str):
"""This function initiates standard transfers between one account to another
:param amount: The amount of tokens to be sent
:type amount: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
"""
self.cic_eth_api.transfer(from_address=self.from_address,
to_address=self.to_address,
value=to_wei(value=amount),
token_symbol=token_symbol)

View File

@@ -1,90 +0,0 @@
# standard imports
import json
import logging
from typing import Union
# third-party imports
import celery
from cic_eth.api import Api
# local imports
from cic_ussd.error import CachedDataNotFoundError
from cic_ussd.redis import create_cached_data_key, get_cached_data
from cic_ussd.conversions import from_wei
logg = logging.getLogger()
class BalanceManager:
def __init__(self, address: str, chain_str: str, token_symbol: str):
"""
:param address: Ethereum address of account whose balance is being queried
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param token_symbol: ERC20 token symbol of whose balance is being queried
:type token_symbol: str
"""
self.address = address
self.chain_str = chain_str
self.token_symbol = token_symbol
def get_balances(self, asynchronous: bool = False) -> Union[celery.Task, dict]:
"""
This function queries cic-eth for an account's balances, It provides a means to receive the balance either
asynchronously or synchronously depending on the provided value for teh asynchronous parameter. It returns a
dictionary containing network, outgoing and incoming balances.
:param asynchronous: Boolean value checking whether to return balances asynchronously
:type asynchronous: bool
:return:
:rtype:
"""
if asynchronous:
cic_eth_api = Api(
chain_str=self.chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback',
callback_param=''
)
cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
else:
cic_eth_api = Api(chain_str=self.chain_str)
balance_request_task = cic_eth_api.balance(
address=self.address,
token_symbol=self.token_symbol)
return balance_request_task.get()[0]
def compute_operational_balance(balances: dict) -> float:
"""This function calculates the right balance given incoming and outgoing
:param balances:
:type balances:
:return:
:rtype:
"""
incoming_balance = balances.get('balance_incoming')
outgoing_balance = balances.get('balance_outgoing')
network_balance = balances.get('balance_network')
operational_balance = (network_balance + incoming_balance) - outgoing_balance
return from_wei(value=operational_balance)
def get_cached_operational_balance(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt=':cic.balances_data'
)
cached_balance = get_cached_data(key=key)
if cached_balance:
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
return operational_balance
else:
raise CachedDataNotFoundError('Cached operational balance not found.')

View File

@@ -8,8 +8,8 @@ from redis import Redis
logg = logging.getLogger()
class InMemoryStore:
cache: Redis = None
class Cache:
store: Redis = None
def cache_data(key: str, data: str):
@@ -21,9 +21,10 @@ def cache_data(key: str, data: str):
:return:
:rtype:
"""
cache = InMemoryStore.cache
cache = Cache.store
cache.set(name=key, value=data)
cache.persist(name=key)
logg.debug(f'caching: {data} with key: {key}.')
def get_cached_data(key: str):
@@ -33,11 +34,11 @@ def get_cached_data(key: str):
:return:
:rtype:
"""
cache = InMemoryStore.cache
cache = Cache.store
return cache.get(name=key)
def create_cached_data_key(identifier: bytes, salt: str):
def cache_data_key(identifier: bytes, salt: str):
"""
:param identifier:
:type identifier:

View File

@@ -1,41 +0,0 @@
# standard imports
import decimal
# third-party imports
# local imports
def truncate(value: float, decimals: int):
"""This function truncates a value to a specified number of decimals places.
:param value: The value to be truncated.
:type value: float
:param decimals: The number of decimals for the value to be truncated to
:type decimals: int
:return: The truncated value.
:rtype: int
"""
decimal.getcontext().rounding = decimal.ROUND_DOWN
contextualized_value = decimal.Decimal(value)
return round(contextualized_value, decimals)
def from_wei(value: int) -> float:
"""This function converts values in Wei to a token in the cic network.
:param value: Value in Wei
:type value: int
:return: platform's default token equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
return truncate(value=value, decimals=2)
def to_wei(value: int) -> int:
"""This functions converts values from a token in the cic network to Wei.
:param value: Value in platform's default token
:type value: int
:return: Wei equivalent of value in platform's default token
:rtype: int
"""
return int(value * 1e+6)

View File

@@ -26,7 +26,7 @@ def upgrade():
sa.Column('msisdn', sa.String(), nullable=False),
sa.Column('user_input', sa.String(), nullable=True),
sa.Column('state', sa.String(), nullable=False),
sa.Column('session_data', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('data', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('version', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)

View File

@@ -24,7 +24,7 @@ def upgrade():
sa.Column('preferred_language', sa.String(), nullable=True),
sa.Column('password_hash', sa.String(), nullable=True),
sa.Column('failed_pin_attempts', sa.Integer(), nullable=False),
sa.Column('account_status', sa.Integer(), nullable=False),
sa.Column('status', sa.Integer(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('updated', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')

View File

@@ -1,11 +1,17 @@
# standard imports
import json
# external imports
from chainlib.hash import strip_0x
from cic_eth.api import Api
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language, parse_account_metadata
from cic_ussd.cache import Cache, cache_data_key, get_cached_data
from cic_ussd.db.enum import AccountStatus
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.encoder import check_password_hash, create_password_hash
# third party imports
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm.session import Session
@@ -21,9 +27,32 @@ class Account(SessionBase):
phone_number = Column(String)
password_hash = Column(String)
failed_pin_attempts = Column(Integer)
account_status = Column(Integer)
status = Column(Integer)
preferred_language = Column(String)
def __init__(self, blockchain_address, phone_number):
self.blockchain_address = blockchain_address
self.phone_number = phone_number
self.password_hash = None
self.failed_pin_attempts = 0
self.status = AccountStatus.PENDING.value
def __repr__(self):
return f'<Account: {self.blockchain_address}>'
def activate_account(self):
"""This method is used to reset failed pin attempts and change account status to Active."""
self.failed_pin_attempts = 0
self.status = AccountStatus.ACTIVE.value
def create_password(self, password):
"""This method takes a password value and hashes the value before assigning it to the corresponding
`hashed_password` attribute in the user record.
:param password: A password value
:type password: str
"""
self.password_hash = create_password_hash(password)
@staticmethod
def get_by_phone_number(phone_number: str, session: Session):
"""Retrieves an account from a phone number.
@@ -39,23 +68,68 @@ class Account(SessionBase):
SessionBase.release_session(session=session)
return account
def __init__(self, blockchain_address, phone_number):
self.blockchain_address = blockchain_address
self.phone_number = phone_number
self.password_hash = None
self.failed_pin_attempts = 0
self.account_status = AccountStatus.PENDING.value
def has_preferred_language(self) -> bool:
return get_cached_preferred_language(self.blockchain_address) is not None
def __repr__(self):
return f'<Account: {self.blockchain_address}>'
def create_password(self, password):
"""This method takes a password value and hashes the value before assigning it to the corresponding
`hashed_password` attribute in the user record.
:param password: A password value
:type password: str
def has_valid_pin(self, session: Session):
"""
self.password_hash = create_password_hash(password)
:param session:
:type session:
:return:
:rtype:
"""
return self.get_status(session) == AccountStatus.ACTIVE.name and self.password_hash is not None
def pin_is_blocked(self, session: Session) -> bool:
"""
:param session:
:type session:
:return:
:rtype:
"""
return self.failed_pin_attempts == 3 and self.get_status(session) == AccountStatus.LOCKED.name
def reset_pin(self, session: Session) -> str:
"""This function resets the number of failed pin attempts to zero. It places the account in pin reset status
enabling users to reset their pins.
:param session: Database session object.
:type session: Session
"""
session = SessionBase.bind_session(session=session)
self.failed_pin_attempts = 0
self.status = AccountStatus.RESET.value
session.add(self)
session.flush()
SessionBase.release_session(session=session)
return f'Pin reset successful.'
def standard_metadata_id(self) -> str:
"""This function creates an account's standard metadata identification information that contains an account owner's
given name, family name and phone number and defaults to a phone number in the absence of metadata.
:return: Standard metadata identification information | e164 formatted phone number.
:rtype: str
"""
identifier = bytes.fromhex(strip_0x(self.blockchain_address))
key = cache_data_key(identifier, ':cic.person')
account_metadata = get_cached_data(key)
if not account_metadata:
return self.phone_number
account_metadata = json.loads(account_metadata)
return parse_account_metadata(account_metadata)
def get_status(self, session: Session):
"""This function handles account status queries, it checks whether an account's failed pin attempts exceed 2 and
updates the account status locked, it then returns the account status
:return: The account status for a user object
:rtype: str
"""
session = SessionBase.bind_session(session=session)
if self.failed_pin_attempts > 2:
self.status = AccountStatus.LOCKED.value
session.add(self)
session.flush()
SessionBase.release_session(session=session)
return AccountStatus(self.status).name
def verify_password(self, password):
"""This method takes a password value and compares it to the user's corresponding `hashed_password` value to
@@ -67,33 +141,41 @@ class Account(SessionBase):
"""
return check_password_hash(password, self.password_hash)
def reset_account_pin(self):
"""This method is used to unlock a user's account."""
self.failed_pin_attempts = 0
self.account_status = AccountStatus.RESET.value
def get_account_status(self):
"""This method checks whether the account is past the allowed number of failed pin attempts.
If so, it changes the accounts status to Locked.
:return: The account status for a user object
:rtype: str
"""
if self.failed_pin_attempts > 2:
self.account_status = AccountStatus.LOCKED.value
return AccountStatus(self.account_status).name
def create(chain_str: str, phone_number: str, session: Session):
"""
:param chain_str:
:type chain_str:
:param phone_number:
:type phone_number:
:param session:
:type session:
:return:
:rtype:
"""
api = Api(callback_task='cic_ussd.tasks.callback_handler.account_creation_callback',
callback_queue='cic-ussd',
callback_param='',
chain_str=chain_str)
task_uuid = api.create_account().id
TaskTracker.add(session=session, task_uuid=task_uuid)
cache_creation_task_uuid(phone_number=phone_number, task_uuid=task_uuid)
def activate_account(self):
"""This method is used to reset failed pin attempts and change account status to Active."""
self.failed_pin_attempts = 0
self.account_status = AccountStatus.ACTIVE.value
def has_valid_pin(self):
"""This method checks whether the user's account status and if a pin hash is present which implies
pin validity.
:return: The presence of a valid pin and status of the account being active.
:rtype: bool
"""
valid_pin = None
if self.get_account_status() == 'ACTIVE' and self.password_hash is not None:
valid_pin = True
return valid_pin
def cache_creation_task_uuid(phone_number: str, task_uuid: str):
"""This function stores the task id that is returned from a task spawned to create a blockchain account in the redis
cache.
:param phone_number: The phone number for the user whose account is being created.
:type phone_number: str
:param task_uuid: A celery task id
:type task_uuid: str
"""
cache = Cache.store
account_creation_request_data = {
'phone_number': phone_number,
'sms_notification_sent': False,
'status': 'PENDING',
'task_uuid': task_uuid
}
cache.set(task_uuid, json.dumps(account_creation_request_data))
cache.persist(name=task_uuid)

View File

@@ -8,11 +8,11 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
@@ -42,14 +42,12 @@ class SessionBase(Model):
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
@@ -57,7 +55,6 @@ class SessionBase(Model):
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
@@ -71,14 +68,14 @@ class SessionBase(Model):
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
dsn,
max_overflow=pool_size * 3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
@@ -87,20 +84,19 @@ class SessionBase(Model):
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
@@ -108,18 +104,16 @@ class SessionBase(Model):
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
if localsession is None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
@@ -127,3 +121,4 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -3,6 +3,7 @@ import logging
# third-party imports
from sqlalchemy import Column, String
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.db.models.base import SessionBase
@@ -17,3 +18,17 @@ class TaskTracker(SessionBase):
self.task_uuid = task_uuid
task_uuid = Column(String, nullable=False)
@staticmethod
def add(session: Session, task_uuid: str):
"""This function persists celery tasks uuids to storage.
:param session: Database session object.
:type session: Session
:param task_uuid: The uuid for an initiated task.
:type task_uuid: str
"""
session = SessionBase.bind_session(session=session)
task_record = TaskTracker(task_uuid=task_uuid)
session.add(task_record)
session.flush()
SessionBase.release_session(session=session)

View File

@@ -2,9 +2,10 @@
import logging
# third-party imports
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, desc, Integer, String
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.db.models.base import SessionBase
@@ -16,26 +17,26 @@ logg = logging.getLogger(__name__)
class UssdSession(SessionBase):
__tablename__ = 'ussd_session'
data = Column(JSON)
external_session_id = Column(String, nullable=False, index=True, unique=True)
service_code = Column(String, nullable=False)
msisdn = Column(String, nullable=False)
user_input = Column(String)
service_code = Column(String, nullable=False)
state = Column(String, nullable=False)
session_data = Column(JSON)
user_input = Column(String)
version = Column(Integer, nullable=False)
def set_data(self, key, session, value):
if self.session_data is None:
self.session_data = {}
self.session_data[key] = value
if self.data is None:
self.data = {}
self.data[key] = value
# https://stackoverflow.com/questions/42559434/updates-to-json-field-dont-persist-to-db
flag_modified(self, "session_data")
flag_modified(self, "data")
session.add(self)
def get_data(self, key):
if self.session_data is not None:
return self.session_data.get(key)
if self.data is not None:
return self.data.get(key)
else:
return None
@@ -51,9 +52,37 @@ class UssdSession(SessionBase):
session.add(self)
@staticmethod
def have_session_for_phone(phone):
r = UssdSession.session.query(UssdSession).filter_by(msisdn=phone).first()
return r is not None
def has_record_for_phone_number(phone_number: str, session: Session):
"""
:param phone_number:
:type phone_number:
:param session:
:type session:
:return:
:rtype:
"""
session = SessionBase.bind_session(session=session)
ussd_session = session.query(UssdSession).filter_by(msisdn=phone_number).first()
SessionBase.release_session(session=session)
return ussd_session is not None
@staticmethod
def last_ussd_session(phone_number: str, session: Session):
"""
:param phone_number:
:type phone_number:
:param session:
:type session:
:return:
:rtype:
"""
session = SessionBase.bind_session(session=session)
ussd_session = session.query(UssdSession) \
.filter_by(msisdn=phone_number) \
.order_by(desc(UssdSession.created)) \
.first()
SessionBase.release_session(session=session)
return ussd_session
def to_json(self):
""" This function serializes the in db ussd session object to a JSON object
@@ -61,11 +90,11 @@ class UssdSession(SessionBase):
:rtype: dict
"""
return {
"data": self.data,
"external_session_id": self.external_session_id,
"service_code": self.service_code,
"msisdn": self.msisdn,
"user_input": self.user_input,
"service_code": self.service_code,
"state": self.state,
"session_data": self.session_data,
"user_input": self.user_input,
"version": self.version
}

View File

@@ -8,27 +8,27 @@ class SessionNotFoundError(Exception):
pass
class InvalidFileFormatError(OSError):
class InvalidFileFormatError(Exception):
"""Raised when the file format is invalid."""
pass
class ActionDataNotFoundError(OSError):
"""Raised when action data matching a specific task uuid is not found in the redis cache"""
class AccountCreationDataNotFound(Exception):
"""Raised when account creation data matching a specific task uuid is not found in the redis cache"""
pass
class MetadataNotFoundError(OSError):
class MetadataNotFoundError(Exception):
"""Raised when metadata is expected but not available in cache."""
pass
class UnsupportedMethodError(OSError):
class UnsupportedMethodError(Exception):
"""Raised when the method passed to the make request function is unsupported."""
pass
class CachedDataNotFoundError(OSError):
class CachedDataNotFoundError(Exception):
"""Raised when the method passed to the make request function is unsupported."""
pass
@@ -48,3 +48,8 @@ class InitializationError(Exception):
pass
class UnknownUssdRecipient(Exception):
"""Raised when a recipient of a transaction is not known to the ussd application."""

View File

View File

@@ -0,0 +1,65 @@
# standard imports
import logging
from typing import Optional, Union
from urllib.parse import urlparse, parse_qs
# external imports
import requests
from requests.exceptions import HTTPError
# local imports
from cic_ussd.error import UnsupportedMethodError
logg = logging.getLogger(__file__)
def error_handler(result: requests.Response):
""""""
status_code = result.status_code
if 100 <= status_code < 200:
raise HTTPError(f'Informational errors: {status_code}, reason: {result.reason}')
elif 300 <= status_code < 400:
raise HTTPError(f'Redirect Issues: {status_code}, reason: {result.reason}')
elif 400 <= status_code < 500:
raise HTTPError(f'Client Error: {status_code}, reason: {result.reason}')
elif 500 <= status_code < 600:
raise HTTPError(f'Server Error: {status_code}, reason: {result.reason}')
def get_query_parameters(env: dict, query_name: Optional[str] = None) -> Union[dict, str]:
""""""
parsed_url = urlparse(env.get('REQUEST_URI'))
params = parse_qs(parsed_url.query)
if query_name:
return params.get(query_name)[0]
return params
def get_request_endpoint(env: dict) -> str:
""""""
return env.get('PATH_INFO')
def get_request_method(env: dict) -> str:
""""""
return env.get('REQUEST_METHOD').upper()
def make_request(method: str, url: str, data: any = None, headers: dict = None):
""""""
if method == 'GET':
logg.debug(f'Retrieving data from: {url}')
result = requests.get(url=url)
elif method == 'POST':
logg.debug(f'Posting to: {url} with: {data}')
result = requests.post(url=url, data=data, headers=headers)
elif method == 'PUT':
logg.debug(f'Putting to: {url} with: {data}')
result = requests.put(url=url, data=data, headers=headers)
else:
raise UnsupportedMethodError(f'Unsupported method: {method}')
return result

View File

@@ -0,0 +1,26 @@
# standard imports
import logging
from typing import Tuple
# external imports
# local imports
def with_content_headers(headers: list, response: str) -> Tuple[bytes, list]:
"""This function calculates the length of a http response body and appends the content length to the headers.
:param headers: A list of tuples defining headers for responses.
:type headers: list
:param response: The response to send for an incoming http request
:type response: str
:return: A tuple containing the response bytes and a list of tuples defining headers
:rtype: tuple
"""
response_bytes = response.encode('utf-8')
content_length = len(response_bytes)
content_length_header = ('Content-Length', str(content_length))
for position, header in enumerate(headers):
if 'Content-Length' in header:
headers.pop(position)
headers.append(content_length_header)
return response_bytes, headers

View File

@@ -0,0 +1,87 @@
# standard imports
import json
import logging
import re
from urllib.parse import quote_plus
# external imports
from sqlalchemy import desc
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.db.enum import AccountStatus
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.http.requests import get_query_parameters, get_request_method
from cic_ussd.http.responses import with_content_headers
logg = logging.getLogger(__file__)
def _get_locked_accounts(env: dict, session: Session):
offset = 0
limit = 100
locked_accounts_path = r'/accounts/locked/(\d+)?/?(\d+)?'
r = re.match(locked_accounts_path, env.get('PATH_INFO'))
if r:
if r.lastindex > 1:
offset = r[1]
limit = r[2]
else:
limit = r[1]
session = SessionBase.bind_session(session)
accounts = session.query(Account.blockchain_address)\
.filter(Account.status == AccountStatus.LOCKED.value, Account.failed_pin_attempts >= 3)\
.order_by(desc(Account.updated))\
.offset(offset)\
.limit(limit)\
.all()
accounts = [blockchain_address for (blockchain_address,) in accounts]
SessionBase.release_session(session=session)
response = json.dumps(accounts)
return response, '200 OK'
def locked_accounts(env: dict, session: Session) -> tuple:
"""
:param env:
:type env:
:param session:
:type session:
:return:
:rtype:
"""
if get_request_method(env) == 'GET':
return _get_locked_accounts(env, session)
return '', '405 Play by the rules'
def pin_reset(env: dict, phone_number: str, session: Session):
""""""
account = session.query(Account).filter_by(phone_number=phone_number).first()
if not account:
return '', '404 Not found'
if get_request_method(env) == 'PUT':
return account.reset_pin(session), '200 OK'
if get_request_method(env) == 'GET':
status = account.get_status(session)
response = {
'status': f'{status}'
}
response = json.dumps(response)
return response, '200 OK'
def handle_pin_requests(env, session, errors_headers, start_response):
phone_number = get_query_parameters(env=env, query_name='phoneNumber')
phone_number = quote_plus(phone_number)
response, message = pin_reset(env=env, phone_number=phone_number, session=session)
response_bytes, headers = with_content_headers(errors_headers, response)
session.commit()
session.close()
start_response(message, headers)
return [response_bytes]

View File

@@ -65,11 +65,10 @@ class UssdMenu:
:rtype: Document.
"""
menu = UssdMenu.ussd_menu_db.get(UssdMenu.Menu.name == name)
if not menu:
logg.error("No USSD Menu with name {}".format(name))
return UssdMenu.ussd_menu_db.get(UssdMenu.Menu.name == 'exit_invalid_request')
else:
if menu:
return menu
logg.error("No USSD Menu with name {}".format(name))
return UssdMenu.ussd_menu_db.get(UssdMenu.Menu.name == 'exit_invalid_request')
@staticmethod
def set_description(name: str, description: str):

View File

@@ -1,46 +1,10 @@
# standard imports
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
# external imports
# local imports
from cic_ussd.error import UnsupportedMethodError
def make_request(method: str, url: str, data: any = None, headers: dict = None):
"""
:param method:
:type method:
:param url:
:type url:
:param data:
:type data:
:param headers:
:type headers:
:return:
:rtype:
"""
if method == 'GET':
result = requests.get(url=url)
elif method == 'POST':
result = requests.post(url=url, data=data, headers=headers)
elif method == 'PUT':
result = requests.put(url=url, data=data, headers=headers)
else:
raise UnsupportedMethodError(f'Unsupported method: {method}')
return result
def blockchain_address_to_metadata_pointer(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
return bytes.fromhex(strip_0x(blockchain_address))
from .base import Metadata
from .custom import CustomMetadata
from .person import PersonMetadata
from .phone import PhonePointerMetadata
from .preferences import PreferencesMetadata

View File

@@ -5,17 +5,14 @@ import os
from typing import Dict, Union
# third-part imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.metadata import make_request
from cic_ussd.cache import cache_data, get_cached_data
from cic_ussd.http.requests import error_handler, make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger(__file__)
class Metadata:
@@ -27,37 +24,10 @@ class Metadata:
base_url = None
def metadata_http_error_handler(result: requests.Response):
""" This function handles and appropriately raises errors from http requests interacting with the metadata server.
:param result: The response object from a http request.
:type result: requests.Response
"""
status_code = result.status_code
if 100 <= status_code < 200:
raise MetadataStoreError(f'Informational errors: {status_code}, reason: {result.reason}')
elif 300 <= status_code < 400:
raise MetadataStoreError(f'Redirect Issues: {status_code}, reason: {result.reason}')
elif 400 <= status_code < 500:
raise MetadataStoreError(f'Client Error: {status_code}, reason: {result.reason}')
elif 500 <= status_code < 600:
raise MetadataStoreError(f'Server Error: {status_code}, reason: {result.reason}')
class MetadataRequestsHandler(Metadata):
def __init__(self, cic_type: str, identifier: bytes, engine: str = 'pgp'):
"""
:param cic_type: The salt value with which to hash a specific metadata identifier.
:type cic_type: str
:param engine: Encryption used for sending data to the metadata server.
:type engine: str
:param identifier: A unique element of data in bytes necessary for creating a metadata pointer.
:type identifier: bytes
"""
""""""
self.cic_type = cic_type
self.engine = engine
self.headers = {
@@ -73,22 +43,16 @@ class MetadataRequestsHandler(Metadata):
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: Union[Dict, str]):
""" This function is responsible for posting data to the metadata server with a corresponding metadata pointer
for storage.
:param data: The data to be stored in the metadata server.
:type data: dict|str
"""
""""""
data = json.dumps(data)
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata_http_error_handler(result=result)
error_handler(result=result)
metadata = result.json()
self.edit(data=metadata)
return self.edit(data=metadata)
def edit(self, data: Union[Dict, str]):
""" This function is responsible for editing data in the metadata server corresponding to a unique pointer.
:param data: The data to be edited in the metadata server.
:type data: dict
"""
""""""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
@@ -104,37 +68,34 @@ class MetadataRequestsHandler(Metadata):
formatted_data = json.dumps(formatted_data)
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'signed metadata submission status: {result.status_code}.')
metadata_http_error_handler(result=result)
error_handler(result=result)
try:
decoded_identifier = self.identifier.decode("utf-8")
except UnicodeDecodeError:
decoded_identifier = self.identifier.hex()
logg.info(f'identifier: {decoded_identifier}. metadata pointer: {self.metadata_pointer} set to: {data}.')
return result
def query(self):
"""
:return:
:rtype:
"""
# retrieve the metadata
""""""
result = make_request(method='GET', url=self.url)
metadata_http_error_handler(result=result)
# json serialize retrieved data
error_handler(result=result)
result_data = result.json()
# validate result data format
if not isinstance(result_data, dict):
raise ValueError(f'Invalid result data object: {result_data}.')
if result.status_code == 200 and self.cic_type == ':cic.person':
# validate person metadata
person = Person()
person_data = person.deserialize(person_data=result_data)
# format new person data for caching
data = json.dumps(person_data.serialize())
# cache metadata
if result.status_code == 200:
if self.cic_type == ':cic.person':
person = Person()
person_data = person.deserialize(person_data=result_data)
serialized_person_data = person_data.serialize()
data = json.dumps(serialized_person_data)
else:
data = json.dumps(result_data)
cache_data(key=self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')
return result_data
def get_cached_metadata(self):
""""""
key = generate_metadata_pointer(self.identifier, self.cic_type)
return get_cached_data(key)

View File

@@ -1,6 +1,6 @@
# standard imports
# third-party imports
# external imports
# local imports
from .base import MetadataRequestsHandler

View File

@@ -1,6 +1,7 @@
# standard imports
# external imports
import celery
# local imports
from .base import MetadataRequestsHandler

View File

@@ -29,10 +29,8 @@ class Signer:
def __init__(self):
self.gpg = gnupg.GPG(gnupghome=self.gpg_path)
# parse key file data
key_file = open(self.key_file_path, 'r')
self.key_data = key_file.read()
key_file.close()
with open(self.key_file_path, 'r') as key_file:
self.key_data = key_file.read()
def get_operational_key(self):
"""

View File

@@ -21,9 +21,6 @@ class Notifier:
:param preferred_language: A notification recipient's preferred language.
:type preferred_language: str
"""
if self.queue is False:
notify_api = Api()
else:
notify_api = Api(queue=self.queue)
notify_api = Api() if self.queue is False else Api(queue=self.queue)
message = translation_for(key=key, preferred_language=preferred_language, **kwargs)
notify_api.sms(recipient=phone_number, message=message)

View File

@@ -1,521 +0,0 @@
# standard imports
import json
import logging
# third party imports
import celery
import i18n
from cic_eth.api.api_task import Api
from sqlalchemy.orm.session import Session
from tinydb.table import Document
from typing import Optional
# local imports
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.processor import custom_display_text, process_request, retrieve_most_recent_ussd_session
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import check_known_user, validate_response_type
logg = logging.getLogger()
def add_tasks_to_tracker(session, task_uuid: str):
"""This function takes tasks spawned over api interfaces and records their creation time for tracking.
:param session:
:type session:
:param task_uuid: The uuid for an initiated task.
:type task_uuid: str
"""
session = SessionBase.bind_session(session=session)
task_record = TaskTracker(task_uuid=task_uuid)
session.add(task_record)
session.flush()
SessionBase.release_session(session=session)
def define_response_with_content(headers: list, response: str) -> tuple:
"""This function encodes responses to byte form in order to make feasible for uwsgi response formats. It then
computes the length of the response and appends the content length to the headers.
:param headers: A list of tuples defining headers for responses.
:type headers: list
:param response: The response to send for an incoming http request
:type response: str
:return: A tuple containing the response bytes and a list of tuples defining headers
:rtype: tuple
"""
response_bytes = response.encode('utf-8')
content_length = len(response_bytes)
content_length_header = ('Content-Length', str(content_length))
# check for content length defaulted to zero in error headers
for position, header in enumerate(headers):
if 'Content-Length' in header:
headers.pop(position)
headers.append(content_length_header)
return response_bytes, headers
def create_ussd_session(
external_session_id: str,
phone: str,
service_code: str,
user_input: str,
current_menu: str,
session_data: Optional[dict] = None) -> InMemoryUssdSession:
"""
Creates a new ussd session
:param external_session_id: Session id value provided by AT
:type external_session_id: str
:param phone: A valid phone number
:type phone: str
:param service_code: service code passed over request
:type service_code AT service code
:param user_input: Input from the request
:type user_input: str
:param current_menu: Menu name that is currently being displayed on the ussd session
:type current_menu: str
:param session_data: Any additional data that was persisted during the user's interaction with the system.
:type session_data: dict.
:return: ussd session object
:rtype: Session
"""
session = InMemoryUssdSession(
external_session_id=external_session_id,
msisdn=phone,
user_input=user_input,
state=current_menu,
service_code=service_code,
session_data=session_data
)
return session
def create_or_update_session(
external_session_id: str,
phone: str,
service_code: str,
user_input: str,
current_menu: str,
session,
session_data: Optional[dict] = None) -> InMemoryUssdSession:
"""
Handles the creation or updating of session as necessary.
:param external_session_id: Session id value provided by AT
:type external_session_id: str
:param phone: A valid phone number
:type phone: str
:param service_code: service code passed over request
:type service_code: AT service code
:param user_input: input from the request
:type user_input: str
:param current_menu: Menu name that is currently being displayed on the ussd session
:type current_menu: str
:param session:
:type session:
:param session_data: Any additional data that was persisted during the user's interaction with the system.
:type session_data: dict.
:return: ussd session object
:rtype: InMemoryUssdSession
"""
session = SessionBase.bind_session(session=session)
existing_ussd_session = session.query(UssdSession).filter_by(
external_session_id=external_session_id).first()
if existing_ussd_session:
ussd_session = update_ussd_session(
ussd_session=existing_ussd_session,
current_menu=current_menu,
user_input=user_input,
session_data=session_data
)
else:
ussd_session = create_ussd_session(
external_session_id=external_session_id,
phone=phone,
service_code=service_code,
user_input=user_input,
current_menu=current_menu,
session_data=session_data
)
SessionBase.release_session(session=session)
return ussd_session
def get_account_status(phone_number, session: Session) -> str:
"""Get the status of a user's account.
:param phone_number: The phone number to be checked.
:type phone_number: str
:param session:
:type session:
:return: The user account status.
:rtype: str
"""
session = SessionBase.bind_session(session=session)
account = Account.get_by_phone_number(phone_number=phone_number, session=session)
status = account.get_account_status()
session.add(account)
session.flush()
SessionBase.release_session(session=session)
return status
def get_latest_input(user_input: str) -> str:
"""This function gets the last value entered by the user from the collective user input which follows the pattern of
asterix (*) separated entries.
:param user_input: The data entered by a user.
:type user_input: str
:return: The last element in the user input value.
:rtype: str
"""
return user_input.split('*')[-1]
def initiate_account_creation_request(chain_str: str,
external_session_id: str,
phone_number: str,
service_code: str,
session,
user_input: str) -> str:
"""This function issues a task to create a blockchain account on cic-eth. It then creates a record of the ussd
session corresponding to the creation of the account and returns a response denoting that the user's account is
being created.
:param chain_str: The chain name and network id.
:type chain_str: str
:param external_session_id: A unique ID from africastalking.
:type external_session_id: str
:param phone_number: The phone number for the account to be created.
:type phone_number: str
:param service_code: The service code dialed.
:type service_code: str
:param session:
:type session:
:param user_input: The input entered by the user.
:type user_input: str
:return: A response denoting that the account is being created.
:rtype: str
"""
# attempt to create a user
cic_eth_api = Api(callback_task='cic_ussd.tasks.callback_handler.process_account_creation_callback',
callback_queue='cic-ussd',
callback_param='',
chain_str=chain_str)
creation_task_id = cic_eth_api.create_account().id
# record task initiation time
add_tasks_to_tracker(task_uuid=creation_task_id, session=session)
# cache account creation data
cache_account_creation_task_id(phone_number=phone_number, task_id=creation_task_id)
# find menu to notify user account is being created
current_menu = UssdMenu.find_by_name(name='account_creation_prompt')
# create a ussd session session
create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
current_menu=current_menu.get('name'),
session=session,
user_input=user_input)
# define response to relay to user
response = define_multilingual_responses(
key='ussd.kenya.account_creation_prompt', locales=['en', 'sw'], prefix='END')
return response
def define_multilingual_responses(key: str, locales: list, prefix: str, **kwargs):
"""This function returns responses in multiple languages in the interest of enabling responses in more than one
language.
:param key: The key to access some text value from the translation files.
:type key: str
:param locales: A list of the locales to translate the text value to.
:type locales: list
:param prefix: The prefix for the text value either: (CON|END)
:type prefix: str
:param kwargs: Other arguments to be passed to the translator
:type kwargs: kwargs
:return: A string of the text value in multiple languages.
:rtype: str
"""
prefix = prefix.upper()
response = f'{prefix} '
for locale in locales:
response += i18n.t(key=key, locale=locale, **kwargs)
response += '\n'
return response
def persist_session_to_db_task(external_session_id: str, queue: str):
"""
This function creates a signature matching the persist session to db task and runs the task asynchronously.
:param external_session_id: Session id value provided by AT
:type external_session_id: str
:param queue: Celery queue on which task should run
:type queue: str
"""
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
s_persist_session_to_db.apply_async(queue=queue)
def cache_account_creation_task_id(phone_number: str, task_id: str):
"""This function stores the task id that is returned from a task spawned to create a blockchain account in the redis
cache.
:param phone_number: The phone number for the user whose account is being created.
:type phone_number: str
:param task_id: A celery task id
:type task_id: str
"""
redis_cache = InMemoryStore.cache
account_creation_request_data = {
'phone_number': phone_number,
'sms_notification_sent': False,
'status': 'PENDING',
'task_id': task_id,
}
redis_cache.set(task_id, json.dumps(account_creation_request_data))
redis_cache.persist(name=task_id)
def process_current_menu(account: Account, session: Session, ussd_session: Optional[dict], user_input: str) -> Document:
"""This function checks user input and returns a corresponding ussd menu
:param ussd_session: An in db ussd session object.
:type ussd_session: UssdSession
:param account: A account object.
:type account: Account
:param session:
:type session:
:param user_input: The user's input.
:type user_input: str
:return: An in memory ussd menu object.
:rtype: Document
"""
# handle invalid inputs
if ussd_session and user_input == "":
current_menu = UssdMenu.find_by_name(name='exit_invalid_input')
else:
# get current state
latest_input = get_latest_input(user_input=user_input)
session = SessionBase.bind_session(session=session)
current_menu = process_request(
account=account,
session=session,
ussd_session=ussd_session,
user_input=latest_input)
SessionBase.release_session(session=session)
return current_menu
def process_menu_interaction_requests(chain_str: str,
external_session_id: str,
phone_number: str,
queue: str,
service_code: str,
session,
user_input: str) -> str:
"""This function handles requests intended for interaction with ussd menu, it checks whether a user matching the
provided phone number exists and in the absence of which it creates an account for the user.
In the event that a user exists it processes the request and returns an appropriate response.
:param chain_str: The chain name and network id.
:type chain_str: str
:param external_session_id: Unique session id from AfricasTalking
:type external_session_id: str
:param phone_number: Phone number of the user making the request.
:type phone_number: str
:param queue: The celery queue on which to run tasks
:type queue: str
:param service_code: The service dialed by the user making the request.
:type service_code: str
:param session:
:type session:
:param user_input: The inputs entered by the user.
:type user_input: str
:return: A response based on the request received.
:rtype: str
"""
# check whether the user exists
if not check_known_user(phone_number=phone_number, session=session):
response = initiate_account_creation_request(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
service_code=service_code,
session=session,
user_input=user_input)
else:
# get account
session = SessionBase.bind_session(session=session)
account = Account.get_by_phone_number(phone_number=phone_number, session=session)
# retrieve and cache user's metadata
blockchain_address = account.blockchain_address
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# find any existing ussd session
existing_ussd_session = session.query(UssdSession).filter_by(external_session_id=external_session_id).first()
# validate user inputs
if existing_ussd_session:
current_menu = process_current_menu(
account=account,
session=session,
ussd_session=existing_ussd_session.to_json(),
user_input=user_input
)
else:
current_menu = process_current_menu(
account=account,
session=session,
ussd_session=None,
user_input=user_input
)
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=account.phone_number, session=session)
if last_ussd_session:
# create or update the ussd session as appropriate
ussd_session = create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
user_input=user_input,
current_menu=current_menu.get('name'),
session=session,
session_data=last_ussd_session.session_data
)
else:
ussd_session = create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
user_input=user_input,
current_menu=current_menu.get('name'),
session=session
)
# define appropriate response
response = custom_display_text(
account=account,
display_key=current_menu.get('display_key'),
menu_name=current_menu.get('name'),
session=session,
ussd_session=ussd_session.to_json(),
)
# check that the response from the processor is valid
if not validate_response_type(processor_response=response):
raise Exception(f'Invalid response: {response}')
# persist session to db
persist_session_to_db_task(external_session_id=external_session_id, queue=queue)
SessionBase.release_session(session=session)
return response
def reset_pin(phone_number: str, session: Session) -> str:
"""Reset account status from Locked to Pending.
:param phone_number: The phone number belonging to the account to be unlocked.
:type phone_number: str
:param session:
:type session:
:return: The status of the pin reset.
:rtype: str
"""
session = SessionBase.bind_session(session=session)
account = Account.get_by_phone_number(phone_number=phone_number, session=session)
account.reset_account_pin()
session.add(account)
session.flush()
SessionBase.release_session(session=session)
response = f'Pin reset for user {phone_number} is successful!'
return response
def update_ussd_session(
ussd_session: InMemoryUssdSession,
user_input: str,
current_menu: str,
session_data: Optional[dict] = None) -> InMemoryUssdSession:
"""
Updates a ussd session
:param ussd_session: Session id value provided by AT
:type ussd_session: InMemoryUssdSession
:param user_input: Input from the request
:type user_input: str
:param current_menu: Menu name that is currently being displayed on the ussd session
:type current_menu: str
:param session_data: Any additional data that was persisted during the user's interaction with the system.
:type session_data: dict.
:return: ussd session object
:rtype: InMemoryUssdSession
"""
if session_data is None:
session_data = ussd_session.session_data
session = InMemoryUssdSession(
external_session_id=ussd_session.external_session_id,
msisdn=ussd_session.msisdn,
user_input=user_input,
state=current_menu,
service_code=ussd_session.service_code,
session_data=session_data
)
return session
def save_to_in_memory_ussd_session_data(queue: str, session: Session, session_data: dict, ussd_session: dict):
"""This function is used to save information to the session data attribute of a ussd session object in the redis
cache.
:param queue: The queue on which the celery task should run.
:type queue: str
:param session:
:type session:
:param session_data: A dictionary containing data for a specific ussd session in redis that needs to be saved
temporarily.
:type session_data: dict
:param ussd_session: A ussd session passed to the state machine.
:type ussd_session: UssdSession
"""
# define redis cache entry point
cache = InMemoryStore.cache
# get external session id
external_session_id = ussd_session.get('external_session_id')
# check for existing session data
existing_session_data = ussd_session.get('session_data')
# merge old session data with new inputs to session data
if existing_session_data:
session_data = {**existing_session_data, **session_data}
# get corresponding session record
in_redis_ussd_session = cache.get(external_session_id)
in_redis_ussd_session = json.loads(in_redis_ussd_session)
# create new in memory ussd session with current ussd session data
create_or_update_session(
external_session_id=external_session_id,
phone=in_redis_ussd_session.get('msisdn'),
service_code=in_redis_ussd_session.get('service_code'),
user_input=in_redis_ussd_session.get('user_input'),
current_menu=in_redis_ussd_session.get('state'),
session=session,
session_data=session_data
)
persist_session_to_db_task(external_session_id=external_session_id, queue=queue)

View File

@@ -29,9 +29,10 @@ def process_phone_number(phone_number: str, region: str):
pass
phone_number_object = phonenumbers.parse(phone_number, region)
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
return phonenumbers.format_number(
phone_number_object, phonenumbers.PhoneNumberFormat.E164
)
return parsed_phone_number
class Support:
phone_number = None

View File

@@ -1,566 +0,0 @@
# standard imports
import datetime
import logging
import json
from typing import Optional
# third party imports
from sqlalchemy import desc
from cic_eth.api import Api
from sqlalchemy.orm.session import Session
from tinydb.table import Document
# local imports
from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement
from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.db.enum import AccountStatus
from cic_ussd.error import SeppukuError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import Support
from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.conversions import to_wei, from_wei
from cic_ussd.translation import translation_for
from cic_types.models.person import generate_metadata_pointer, get_contact_data_from_vcard
logg = logging.getLogger(__name__)
def get_default_token_data():
chain_str = Chain.spec.__str__()
cic_eth_api = Api(chain_str=chain_str)
default_token_request_task = cic_eth_api.default_token()
default_token_data = default_token_request_task.get()
return default_token_data
def retrieve_token_symbol(chain_str: str = Chain.spec.__str__()):
"""
:param chain_str:
:type chain_str:
:return:
:rtype:
"""
cache_key = create_cached_data_key(
identifier=chain_str.encode('utf-8'),
salt=':cic.default_token_data'
)
cached_data = get_cached_data(key=cache_key)
if cached_data:
default_token_data = json.loads(cached_data)
return default_token_data.get('symbol')
else:
logg.warning('Cached default token data not found. Attempting retrieval from default token API')
default_token_data = get_default_token_data()
if default_token_data:
return default_token_data.get('symbol')
else:
raise SeppukuError(f'Could not retrieve default token for: {chain_str}')
def process_pin_authorization(account: Account, display_key: str, **kwargs) -> str:
"""This method provides translation for all ussd menu entries that follow the pin authorization pattern.
:param account: The account in a running USSD session.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param kwargs: Any additional information required by the text values in the internationalization files.
:type kwargs
:return: A string value corresponding the ussd menu's text value.
:rtype: str
"""
remaining_attempts = 3
if account.failed_pin_attempts > 0:
return translation_for(
key=f'{display_key}.retry',
preferred_language=account.preferred_language,
remaining_attempts=(remaining_attempts - account.failed_pin_attempts)
)
else:
return translation_for(
key=f'{display_key}.first',
preferred_language=account.preferred_language,
**kwargs
)
def process_exit_insufficient_balance(account: Account, display_key: str, session: Session, ussd_session: dict):
"""This function processes the exit menu letting users their account balance is insufficient to perform a specific
transaction.
:param account: The account requesting access to the ussd menu.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param session:
:type session:
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: Corresponding translation text response
:rtype: str
"""
# get account balance
operational_balance = get_cached_operational_balance(blockchain_address=account.blockchain_address)
# compile response data
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
# get default data
token_symbol = retrieve_token_symbol()
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session)
tx_recipient_information = define_account_tx_metadata(user=recipient)
return translation_for(
key=display_key,
preferred_language=account.preferred_language,
amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=operational_balance
)
def process_exit_successful_transaction(account: Account, display_key: str, session: Session, ussd_session: dict):
"""This function processes the exit menu after a successful initiation for a transfer of tokens.
:param account: The account requesting access to the ussd menu.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param session:
:type session:
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: Corresponding translation text response
:rtype: str
"""
transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount')))
token_symbol = retrieve_token_symbol()
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=account)
return translation_for(
key=display_key,
preferred_language=account.preferred_language,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
sender_information=tx_sender_information
)
def process_transaction_pin_authorization(account: Account, display_key: str, session: Session, ussd_session: dict):
"""This function processes pin authorization where making a transaction is concerned. It constructs a
pre-transaction response menu that shows the details of the transaction.
:param account: The account requesting access to the ussd menu.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param session:
:type session:
:param ussd_session: The USSD session determining what user data needs to be extracted and added to the menu's
text values.
:type ussd_session: UssdSession
:return: Corresponding translation text response
:rtype: str
"""
# compile response data
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=session)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=account)
token_symbol = retrieve_token_symbol()
user_input = ussd_session.get('session_data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input))
logg.debug('Requires integration to determine user tokens.')
return process_pin_authorization(
account=account,
display_key=display_key,
recipient_information=tx_recipient_information,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
sender_information=tx_sender_information
)
def process_account_balances(user: Account, display_key: str):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:return:
:rtype:
"""
# retrieve cached balance
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
logg.debug('Requires call to retrieve tax and bonus amounts')
tax = ''
bonus = ''
token_symbol = retrieve_token_symbol()
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
operational_balance=operational_balance,
tax=tax,
bonus=bonus,
token_symbol=token_symbol
)
def format_transactions(transactions: list, preferred_language: str, token_symbol: str):
formatted_transactions = ''
if len(transactions) > 0:
for transaction in transactions:
recipient_phone_number = transaction.get('recipient_phone_number')
sender_phone_number = transaction.get('sender_phone_number')
value = transaction.get('to_value')
timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag')
direction = transaction.get('direction')
token_symbol = token_symbol
if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {recipient_phone_number} {timestamp}.\n'
else:
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {sender_phone_number} {timestamp}. \n'
return formatted_transactions
else:
if preferred_language == 'en':
formatted_transactions = 'NO TRANSACTION HISTORY'
else:
formatted_transactions = 'HAMNA RIPOTI YA MATUMIZI'
return formatted_transactions
def process_display_user_metadata(user: Account, display_key: str):
"""
:param user:
:type user:
:param display_key:
:type display_key:
"""
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type=':cic.person'
)
cached_metadata = get_cached_data(key)
if cached_metadata:
user_metadata = json.loads(cached_metadata)
contact_data = get_contact_data_from_vcard(vcard=user_metadata.get('vcard'))
logg.debug(f'{contact_data}')
full_name = f'{contact_data.get("given")} {contact_data.get("family")}'
date_of_birth = user_metadata.get('date_of_birth')
year_of_birth = date_of_birth.get('year')
present_year = datetime.datetime.now().year
age = present_year - year_of_birth
gender = user_metadata.get('gender')
products = ', '.join(user_metadata.get('products'))
location = user_metadata.get('location').get('area_name')
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
full_name=full_name,
age=age,
gender=gender,
location=location,
products=products
)
else:
# TODO [Philip]: All these translations could be moved to translation files.
logg.warning(f'Expected person metadata but found none in cache for key: {key}')
absent = ''
if user.preferred_language == 'en':
absent = 'Not provided'
elif user.preferred_language == 'sw':
absent = 'Haijawekwa'
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
full_name=absent,
gender=absent,
age=absent,
location=absent,
products=absent
)
def process_account_statement(user: Account, display_key: str):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:return:
:rtype:
"""
# retrieve cached statement
identifier = blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address)
key = create_cached_data_key(identifier=identifier, salt=':cic.statement')
transactions = get_cached_data(key=key)
token_symbol = retrieve_token_symbol()
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
transactions = json.loads(transactions)
if len(transactions) > 6:
last_transaction_set += transactions[6:]
middle_transaction_set += transactions[3:][:3]
first_transaction_set += transactions[:3]
# there are probably much cleaner and operational inexpensive ways to do this so find them
elif 3 < len(transactions) < 7:
middle_transaction_set += transactions[3:]
first_transaction_set += transactions[:3]
else:
first_transaction_set += transactions[:3]
if display_key == 'ussd.kenya.first_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
first_transaction_set=format_transactions(
transactions=first_transaction_set,
preferred_language=user.preferred_language,
token_symbol=token_symbol
)
)
elif display_key == 'ussd.kenya.middle_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
middle_transaction_set=format_transactions(
transactions=middle_transaction_set,
preferred_language=user.preferred_language,
token_symbol=token_symbol
)
)
elif display_key == 'ussd.kenya.last_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
last_transaction_set=format_transactions(
transactions=last_transaction_set,
preferred_language=user.preferred_language,
token_symbol=token_symbol
)
)
def process_start_menu(display_key: str, user: Account):
"""This function gets data on an account's balance and token in order to append it to the start of the start menu's
title. It passes said arguments to the translation function and returns the appropriate corresponding text from the
translation files.
:param user: The user requesting access to the ussd menu.
:type user: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:return: Corresponding translation text response
:rtype: str
"""
token_symbol = retrieve_token_symbol()
chain_str = Chain.spec.__str__()
blockchain_address = user.blockchain_address
balance_manager = BalanceManager(address=blockchain_address,
chain_str=chain_str,
token_symbol=token_symbol)
# get balances synchronously for display on start menu
balances_data = balance_manager.get_balances()
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt=':cic.balances_data'
)
cache_data(key=key, data=json.dumps(balances_data))
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
account_balance=operational_balance,
account_token_name=token_symbol
)
def retrieve_most_recent_ussd_session(phone_number: str, session: Session) -> UssdSession:
# get last ussd session based on user phone number
session = SessionBase.bind_session(session=session)
last_ussd_session = session.query(UssdSession)\
.filter_by(msisdn=phone_number)\
.order_by(desc(UssdSession.created))\
.first()
SessionBase.release_session(session=session)
return last_ussd_session
def process_request(account: Account, session, user_input: str, ussd_session: Optional[dict] = None) -> Document:
"""This function assesses a request based on the user from the request comes, the session_id and the user's
input. It determines whether the request translates to a return to an existing session by checking whether the
provided session id exists in the database or whether the creation of a new ussd session object is warranted.
It then returns the appropriate ussd menu text values.
:param account: The account requesting access to the ussd menu.
:type account: Account
:param session:
:type session:
:param user_input: The value a user enters in the ussd menu.
:type user_input: str
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: A ussd menu's corresponding text value.
:rtype: Document
"""
if ussd_session:
if user_input == "0":
return UssdMenu.parent_menu(menu_name=ussd_session.get('state'))
else:
successive_state = next_state(
account=account,
session=session,
ussd_session=ussd_session,
user_input=user_input)
return UssdMenu.find_by_name(name=successive_state)
else:
if account.has_valid_pin():
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=account.phone_number, session=session)
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state in [
'account_creation_prompt',
'exit',
'exit_invalid_pin',
'exit_invalid_new_pin',
'exit_pin_mismatch',
'exit_invalid_request',
'exit_successful_transaction'
]:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)
else:
if account.failed_pin_attempts >= 3 and account.get_account_status() == AccountStatus.LOCKED.name:
return UssdMenu.find_by_name(name='exit_pin_blocked')
elif account.preferred_language is None:
return UssdMenu.find_by_name(name='initial_language_selection')
else:
return UssdMenu.find_by_name(name='initial_pin_entry')
def next_state(account: Account, session, ussd_session: dict, user_input: str) -> str:
"""This function navigates the state machine based on the ussd session object and user inputs it receives.
It checks the user input and provides the successive state in the state machine. It then updates the session's
state attribute with the new state.
:param account: The account requesting access to the ussd menu.
:type account: Account
:param session:
:type session:
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:param user_input: The value a user enters in the ussd menu.
:type user_input: str
:return: A string value corresponding the successive give a specific state in the state machine.
"""
state_machine = UssdStateMachine(ussd_session=ussd_session)
state_machine.scan_data((user_input, ussd_session, account, session))
new_state = state_machine.state
return new_state
def process_exit_invalid_menu_option(display_key: str, preferred_language: str):
return translation_for(
key=display_key,
preferred_language=preferred_language,
support_phone=Support.phone_number
)
def custom_display_text(
account: Account,
display_key: str,
menu_name: str,
session: Session,
ussd_session: dict) -> str:
"""This function extracts the appropriate session data based on the current menu name. It then inserts them as
keywords in the i18n function.
:param account: The account in a running USSD session.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param menu_name: The name by which a specific menu can be identified.
:type menu_name: str
:param session:
:type session:
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: A string value corresponding the ussd menu's text value.
:rtype: str
"""
if menu_name == 'transaction_pin_authorization':
return process_transaction_pin_authorization(
account=account,
display_key=display_key,
session=session,
ussd_session=ussd_session)
elif menu_name == 'exit_insufficient_balance':
return process_exit_insufficient_balance(
account=account,
display_key=display_key,
session=session,
ussd_session=ussd_session)
elif menu_name == 'exit_successful_transaction':
return process_exit_successful_transaction(
account=account,
display_key=display_key,
session=session,
ussd_session=ussd_session)
elif menu_name == 'start':
return process_start_menu(display_key=display_key, user=account)
elif 'pin_authorization' in menu_name:
return process_pin_authorization(
account=account,
display_key=display_key,
session=session)
elif 'enter_current_pin' in menu_name:
return process_pin_authorization(
account=account,
display_key=display_key,
session=session)
elif menu_name == 'account_balances':
return process_account_balances(display_key=display_key, user=account)
elif 'transaction_set' in menu_name:
return process_account_statement(display_key=display_key, user=account)
elif menu_name == 'display_user_metadata':
return process_display_user_metadata(display_key=display_key, user=account)
elif menu_name == 'exit_invalid_menu_option':
return process_exit_invalid_menu_option(display_key=display_key, preferred_language=account.preferred_language)
else:
return translation_for(key=display_key, preferred_language=account.preferred_language)

View File

@@ -0,0 +1,305 @@
# standard imports
import json
import logging
# external imports
import i18n.config
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.account.balance import calculate_available_balance, get_balances, get_cached_available_balance
from cic_ussd.account.chain import Chain
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import (
get_cached_statement,
parse_statement_transactions,
query_statement,
statement_transaction_set
)
from cic_ussd.account.transaction import from_wei, to_wei
from cic_ussd.account.tokens import get_default_token_symbol
from cic_ussd.cache import cache_data_key, cache_data
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import PersonMetadata
from cic_ussd.phone_number import Support
from cic_ussd.processor.util import latest_input, parse_person_metadata
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
class MenuProcessor:
def __init__(self, account: Account, display_key: str, menu_name: str, session: Session, ussd_session: dict):
self.account = account
self.display_key = display_key
self.identifier = bytes.fromhex(self.account.blockchain_address[2:])
self.menu_name = menu_name
self.session = session
self.ussd_session = ussd_session
def account_balances(self) -> str:
"""
:return:
:rtype:
"""
available_balance = get_cached_available_balance(self.account.blockchain_address)
logg.debug('Requires call to retrieve tax and bonus amounts')
tax = ''
bonus = ''
token_symbol = get_default_token_symbol()
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for(
key=self.display_key,
preferred_language=preferred_language,
available_balance=available_balance,
tax=tax,
bonus=bonus,
token_symbol=token_symbol
)
def account_statement(self) -> str:
"""
:return:
:rtype:
"""
cached_statement = get_cached_statement(self.account.blockchain_address)
statement = json.loads(cached_statement)
statement_transactions = parse_statement_transactions(statement)
transaction_sets = [statement_transactions[tx:tx+3] for tx in range(0, len(statement_transactions), 3)]
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
if transaction_sets:
first_transaction_set = statement_transaction_set(preferred_language, transaction_sets[0])
if len(transaction_sets) >= 2:
middle_transaction_set = statement_transaction_set(preferred_language, transaction_sets[1])
if len(transaction_sets) >= 3:
last_transaction_set = statement_transaction_set(preferred_language, transaction_sets[2])
if self.display_key == 'ussd.kenya.first_transaction_set':
return translation_for(
self.display_key, preferred_language, first_transaction_set=first_transaction_set
)
if self.display_key == 'ussd.kenya.middle_transaction_set':
return translation_for(
self.display_key, preferred_language, middle_transaction_set=middle_transaction_set
)
if self.display_key == 'ussd.kenya.last_transaction_set':
return translation_for(
self.display_key, preferred_language, last_transaction_set=last_transaction_set
)
def help(self):
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for(self.display_key, preferred_language, support_phone=Support.phone_number)
def person_metadata(self) -> str:
"""
:return:
:rtype:
"""
person_metadata = PersonMetadata(self.identifier)
cached_person_metadata = person_metadata.get_cached_metadata()
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
if cached_person_metadata:
return parse_person_metadata(cached_person_metadata, self.display_key, preferred_language)
absent = translation_for('helpers.not_provided', preferred_language)
return translation_for(
self.display_key,
preferred_language,
full_name=absent,
gender=absent,
age=absent,
location=absent,
products=absent
)
def pin_authorization(self, **kwargs) -> str:
"""
:param kwargs:
:type kwargs:
:return:
:rtype:
"""
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
if self.account.failed_pin_attempts == 0:
return translation_for(f'{self.display_key}.first', preferred_language, **kwargs)
remaining_attempts = 3
remaining_attempts -= self.account.failed_pin_attempts
retry_pin_entry = translation_for(
'ussd.kenya.retry_pin_entry', preferred_language, remaining_attempts=remaining_attempts
)
return translation_for(
f'{self.display_key}.retry', preferred_language, retry_pin_entry=retry_pin_entry
)
def start_menu(self):
"""
:return:
:rtype:
"""
token_symbol = get_default_token_symbol()
blockchain_address = self.account.blockchain_address
balances = get_balances(blockchain_address, Chain.spec.__str__(), token_symbol, False)[0]
key = cache_data_key(self.identifier, ':cic.balances')
cache_data(key, json.dumps(balances))
available_balance = calculate_available_balance(balances)
query_statement(blockchain_address)
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for(
self.display_key, preferred_language, account_balance=available_balance, account_token_name=token_symbol
)
def transaction_pin_authorization(self) -> str:
"""
:return:
:rtype:
"""
recipient_phone_number = self.ussd_session.get('data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(recipient_phone_number, self.session)
tx_recipient_information = recipient.standard_metadata_id()
tx_sender_information = self.account.standard_metadata_id()
token_symbol = get_default_token_symbol()
user_input = self.ussd_session.get('data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input))
return self.pin_authorization(
recipient_information=tx_recipient_information,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
sender_information=tx_sender_information
)
def exit_insufficient_balance(self):
"""
:return:
:rtype:
"""
available_balance = get_cached_available_balance(self.account.blockchain_address)
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
session_data = self.ussd_session.get('data')
transaction_amount = session_data.get('transaction_amount')
transaction_amount = to_wei(value=int(transaction_amount))
token_symbol = get_default_token_symbol()
recipient_phone_number = self.ussd_session.get('data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(recipient_phone_number, self.session)
tx_recipient_information = recipient.standard_metadata_id()
return translation_for(
self.display_key,
preferred_language,
amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=available_balance
)
def exit_invalid_menu_option(self):
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for(self.display_key, preferred_language, support_phone=Support.phone_number)
def exit_pin_blocked(self):
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
return translation_for('ussd.kenya.exit_pin_blocked', preferred_language, support_phone=Support.phone_number)
def exit_successful_transaction(self):
"""
:return:
:rtype:
"""
amount = int(self.ussd_session.get('data').get('transaction_amount'))
preferred_language = get_cached_preferred_language(self.account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
transaction_amount = to_wei(amount)
token_symbol = get_default_token_symbol()
recipient_phone_number = self.ussd_session.get('data').get('recipient_phone_number')
recipient = Account.get_by_phone_number(phone_number=recipient_phone_number, session=self.session)
tx_recipient_information = recipient.standard_metadata_id()
tx_sender_information = self.account.standard_metadata_id()
return translation_for(
self.display_key,
preferred_language,
transaction_amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
sender_information=tx_sender_information
)
def response(account: Account, display_key: str, menu_name: str, session: Session, ussd_session: dict) -> str:
"""This function extracts the appropriate session data based on the current menu name. It then inserts them as
keywords in the i18n function.
:param account: The account in a running USSD session.
:type account: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param menu_name: The name by which a specific menu can be identified.
:type menu_name: str
:param session:
:type session:
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: A string value corresponding the ussd menu's text value.
:rtype: str
"""
menu_processor = MenuProcessor(account, display_key, menu_name, session, ussd_session)
if menu_name == 'start':
return menu_processor.start_menu()
if menu_name == 'help':
return menu_processor.help()
if menu_name == 'transaction_pin_authorization':
return menu_processor.transaction_pin_authorization()
if menu_name == 'exit_insufficient_balance':
return menu_processor.exit_insufficient_balance()
if menu_name == 'exit_successful_transaction':
return menu_processor.exit_successful_transaction()
if menu_name == 'account_balances':
return menu_processor.account_balances()
if 'pin_authorization' in menu_name:
return menu_processor.pin_authorization()
if 'enter_current_pin' in menu_name:
return menu_processor.pin_authorization()
if 'transaction_set' in menu_name:
return menu_processor.account_statement()
if menu_name == 'display_user_metadata':
return menu_processor.person_metadata()
if menu_name == 'exit_invalid_menu_option':
return menu_processor.exit_invalid_menu_option()
if menu_name == 'exit_pin_blocked':
return menu_processor.exit_pin_blocked()
preferred_language = get_cached_preferred_language(account.blockchain_address)
return translation_for(display_key, preferred_language)

View File

@@ -0,0 +1,185 @@
# standard imports
from typing import Optional
# external imports
import celery
import i18n
from sqlalchemy.orm.session import Session
from tinydb.table import Document
# local imports
from cic_ussd.db.models.account import Account, create
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.processor.menu import response
from cic_ussd.processor.util import latest_input, resume_last_ussd_session
from cic_ussd.session.ussd_session import create_or_update_session, persist_ussd_session
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.translation import translation_for
from cic_ussd.validator import is_valid_response
def handle_menu(account: Account, session: Session) -> Document:
"""
:param account:
:type account:
:param session:
:type session:
:return:
:rtype:
"""
if account.pin_is_blocked(session):
return UssdMenu.find_by_name('exit_pin_blocked')
if account.has_valid_pin(session):
last_ussd_session = UssdSession.last_ussd_session(account.phone_number, session)
if last_ussd_session:
return resume_last_ussd_session(last_ussd_session.state)
elif not account.has_preferred_language():
return UssdMenu.find_by_name('initial_language_selection')
else:
return UssdMenu.find_by_name('initial_pin_entry')
def get_menu(account: Account,
session: Session,
user_input: str,
ussd_session: Optional[dict]) -> Document:
"""
:param account:
:type account:
:param session:
:type session:
:param user_input:
:type user_input:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
user_input = latest_input(user_input)
if not ussd_session:
return handle_menu(account, session)
if user_input == '':
return UssdMenu.find_by_name(name='exit_invalid_input')
if user_input == '0':
return UssdMenu.parent_menu(ussd_session.get('state'))
session = SessionBase.bind_session(session)
state = next_state(account, session, user_input, ussd_session)
return UssdMenu.find_by_name(state)
def handle_menu_operations(chain_str: str,
external_session_id: str,
phone_number: str,
queue: str,
service_code: str,
session,
user_input: str):
"""
:param chain_str:
:type chain_str:
:param external_session_id:
:type external_session_id:
:param phone_number:
:type phone_number:
:param queue:
:type queue:
:param service_code:
:type service_code:
:param session:
:type session:
:param user_input:
:type user_input:
:return:
:rtype:
"""
session = SessionBase.bind_session(session=session)
account: Account = Account.get_by_phone_number(phone_number, session)
if account:
return handle_account_menu_operations(account, external_session_id, queue, session, service_code, user_input)
create(chain_str, phone_number, session)
menu = UssdMenu.find_by_name('account_creation_prompt')
preferred_language = i18n.config.get('fallback')
create_or_update_session(
external_session_id=external_session_id,
msisdn=phone_number,
service_code=service_code,
state=menu.get('name'),
session=session,
user_input=user_input)
persist_ussd_session(external_session_id, queue)
return translation_for('ussd.kenya.account_creation_prompt', preferred_language)
def handle_account_menu_operations(account: Account,
external_session_id: str,
queue: str,
session: Session,
service_code: str,
user_input: str):
"""
:param account:
:type account:
:param external_session_id:
:type external_session_id:
:param queue:
:type queue:
:param session:
:type session:
:param service_code:
:type service_code:
:param user_input:
:type user_input:
:return:
:rtype:
"""
phone_number = account.phone_number
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata', [account.blockchain_address], queue='cic-ussd')
s_query_person_metadata.apply_async()
s_query_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_preferences_metadata', [account.blockchain_address], queue='cic-ussd')
s_query_preferences_metadata.apply_async()
existing_ussd_session = session.query(UssdSession).filter_by(external_session_id=external_session_id).first()
last_ussd_session = UssdSession.last_ussd_session(phone_number, session)
if existing_ussd_session:
menu = get_menu(account, session, user_input, existing_ussd_session.to_json())
else:
menu = get_menu(account, session, user_input, None)
if last_ussd_session:
ussd_session = create_or_update_session(
external_session_id, phone_number, service_code, user_input, menu.get('name'), session,
last_ussd_session.data
)
else:
ussd_session = create_or_update_session(
external_session_id, phone_number, service_code, user_input, menu.get('name'), session, None
)
menu_response = response(
account, menu.get('display_key'), menu.get('name'), session, ussd_session.to_json()
)
if not is_valid_response(menu_response):
raise ValueError(f'Invalid response: {response}')
persist_ussd_session(external_session_id, queue)
return menu_response
def next_state(account: Account, session, user_input: str, ussd_session: dict) -> str:
"""
:param account:
:type account:
:param session:
:type session:
:param user_input:
:type user_input:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
state_machine = UssdStateMachine(ussd_session=ussd_session)
state_machine.scan_data((user_input, ussd_session, account, session))
return state_machine.state

View File

@@ -0,0 +1,77 @@
# standard imports
import datetime
import json
# external imports
from cic_types.models.person import get_contact_data_from_vcard
from tinydb.table import Document
# local imports
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.translation import translation_for
def latest_input(user_input: str) -> str:
"""
:param user_input:
:type user_input:
:return:
:rtype:
"""
return user_input.split('*')[-1]
def parse_person_metadata(cached_metadata: str, display_key: str, preferred_language: str) -> str:
"""This function extracts person metadata formatted to suite display on the ussd interface.
:param cached_metadata: Person metadata JSON str.
:type cached_metadata: str
:param display_key: Path to an entry in menu data in translation files.
:type display_key: str
:param preferred_language: An account's set preferred language.
:type preferred_language: str
:return:
:rtype:
"""
user_metadata = json.loads(cached_metadata)
contact_data = get_contact_data_from_vcard(user_metadata.get('vcard'))
full_name = f'{contact_data.get("given")} {contact_data.get("family")}'
date_of_birth = user_metadata.get('date_of_birth')
year_of_birth = date_of_birth.get('year')
present_year = datetime.datetime.now().year
age = present_year - year_of_birth
gender = user_metadata.get('gender')
products = ', '.join(user_metadata.get('products'))
location = user_metadata.get('location').get('area_name')
return translation_for(
key=display_key,
preferred_language=preferred_language,
full_name=full_name,
age=age,
gender=gender,
location=location,
products=products
)
def resume_last_ussd_session(last_state: str) -> Document:
"""
:param last_state:
:type last_state:
:return:
:rtype:
"""
# TODO [Philip]: This can be cleaned further
non_reusable_states = [
'account_creation_prompt',
'exit',
'exit_invalid_pin',
'exit_invalid_new_pin',
'exit_invalid_request',
'exit_pin_blocked',
'exit_pin_mismatch',
'exit_successful_transaction'
]
if last_state in non_reusable_states:
return UssdMenu.find_by_name('start')
return UssdMenu.find_by_name(last_state)

View File

@@ -1,143 +0,0 @@
# standard imports
from typing import Optional, Tuple, Union
import json
import logging
import re
from typing import Optional, Union
from urllib.parse import urlparse, parse_qs
# third-party imports
from sqlalchemy import desc
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.enum import AccountStatus
from cic_ussd.operations import get_account_status, reset_pin
from cic_ussd.validator import check_known_user
logg = logging.getLogger(__file__)
def get_query_parameters(env: dict, query_name: Optional[str] = None) -> Union[dict, str]:
"""Gets value of the request query parameters.
:param env: Object containing server and request information.
:type env: dict
:param query_name: The specific query parameter to fetch.
:type query_name: str
:return: Query parameters from the request.
:rtype: dict | str
"""
parsed_url = urlparse(env.get('REQUEST_URI'))
params = parse_qs(parsed_url.query)
if query_name:
param = params.get(query_name)[0]
return param
return params
def get_request_endpoint(env: dict) -> str:
"""Gets value of the request url path.
:param env: Object containing server and request information
:type env: dict
:return: Endpoint that has been touched by the call
:rtype: str
"""
return env.get('PATH_INFO')
def get_request_method(env: dict) -> str:
"""Gets value of the request method.
:param env: Object containing server and request information.
:type env: dict
:return: Request method.
:rtype: str
"""
return env.get('REQUEST_METHOD').upper()
def get_account_creation_callback_request_data(env: dict) -> tuple:
"""This function retrieves data from a callback
:param env: Object containing server and request information.
:type env: dict
:return: A tuple containing the status, result and task_id for a celery task spawned to create a blockchain
account.
:rtype: tuple
"""
callback_data = env.get('wsgi.input')
status = callback_data.get('status')
task_id = callback_data.get('root_id')
result = callback_data.get('result')
return status, task_id, result
def process_pin_reset_requests(env: dict, phone_number: str, session: Session):
"""This function processes requests that are responsible for the pin reset functionality. It processes GET and PUT
requests responsible for returning an account's status and
:param env: A dictionary of values representing data sent on the api.
:type env: dict
:param phone_number: The phone of the user whose pin is being reset.
:type phone_number: str
:param session:
:type session:
:return: A response denoting the result of the request to reset the user's pin.
:rtype: str
"""
if not check_known_user(phone_number=phone_number, session=session):
return f'No user matching {phone_number} was found.', '404 Not Found'
if get_request_method(env) == 'PUT':
return reset_pin(phone_number=phone_number, session=session), '200 OK'
if get_request_method(env) == 'GET':
status = get_account_status(phone_number=phone_number, session=session)
response = {
'status': f'{status}'
}
response = json.dumps(response)
return response, '200 OK'
def process_locked_accounts_requests(env: dict, session: Session) -> tuple:
"""This function authenticates staff requests and returns a serialized JSON formatted list of blockchain addresses
of accounts for which the PIN has been locked due to too many failed attempts.
:param env: A dictionary of values representing data sent on the api.
:type env: dict
:param session:
:type session:
:return: A tuple containing a serialized list of blockchain addresses for locked accounts and corresponding message
for the response.
:rtype: tuple
"""
session = SessionBase.bind_session(session=session)
response = ''
if get_request_method(env) == 'GET':
offset = 0
limit = 100
locked_accounts_path = r'/accounts/locked/(\d+)?/?(\d+)?'
r = re.match(locked_accounts_path, env.get('PATH_INFO'))
if r:
if r.lastindex > 1:
offset = r[1]
limit = r[2]
else:
limit = r[1]
locked_accounts = session.query(Account.blockchain_address).filter(
Account.account_status == AccountStatus.LOCKED.value,
Account.failed_pin_attempts >= 3).order_by(desc(Account.updated)).offset(offset).limit(limit).all()
# convert lists to scalar blockchain addresses
locked_accounts = [blockchain_address for (blockchain_address, ) in locked_accounts]
SessionBase.release_session(session=session)
response = json.dumps(locked_accounts)
return response, '200 OK'
return response, '405 Play by the rules'

View File

@@ -5,7 +5,7 @@ requests offering control of user account states to a staff behind the client.
# standard imports
import logging
from urllib.parse import quote_plus
# third-party imports
from confini import Config
@@ -13,12 +13,11 @@ from confini import Config
# local imports
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.operations import define_response_with_content
from cic_ussd.requests import (get_request_endpoint,
get_query_parameters,
process_pin_reset_requests,
process_locked_accounts_requests)
from cic_ussd.http.requests import get_request_endpoint
from cic_ussd.http.responses import with_content_headers
from cic_ussd.http.routes import locked_accounts, handle_pin_requests
from cic_ussd.runnable.server_base import exportable_parser, logg
args = exportable_parser.parse_args()
# define log levels
@@ -28,7 +27,7 @@ elif args.v:
logging.getLogger().setLevel(logging.INFO)
# parse config
config = Config(config_dir=args.c, env_prefix=args.env_prefix)
config = Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
@@ -56,20 +55,17 @@ def application(env, start_response):
session = SessionBase.create_session()
if get_request_endpoint(env) == '/pin':
phone_number = get_query_parameters(env=env, query_name='phoneNumber')
phone_number = quote_plus(phone_number)
response, message = process_pin_reset_requests(env=env, phone_number=phone_number, session=session)
response_bytes, headers = define_response_with_content(headers=errors_headers, response=response)
session.commit()
session.close()
start_response(message, headers)
return [response_bytes]
return handle_pin_requests(env, session, errors_headers, start_response)
# handle requests for locked accounts
response, message = process_locked_accounts_requests(env=env, session=session)
response_bytes, headers = define_response_with_content(headers=headers, response=response)
if get_request_endpoint(env) == '/healthz':
start_response('200 OK', headers)
return [b'']
response, message = locked_accounts(env, session)
response_bytes, headers = with_content_headers(headers, response)
start_response(message, headers)
session.commit()
session.close()
return [response_bytes]

View File

@@ -8,14 +8,17 @@ import tempfile
import celery
import i18n
import redis
from chainlib.chain import ChainSpec
from confini import Config
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.cache import Cache
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.base import Metadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.phone_number import Support
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import validate_presence
@@ -31,7 +34,8 @@ arg_parser.add_argument('-c', type=str, default=config_directory, help='config d
arg_parser.add_argument('-q', type=str, default='cic-ussd', help='queue name for worker tasks')
arg_parser.add_argument('-v', action='store_true', help='be verbose')
arg_parser.add_argument('-vv', action='store_true', help='be more verbose')
arg_parser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str,
help='environment prefix for variables to overwrite configuration')
args = arg_parser.parse_args()
# define log levels
@@ -49,7 +53,8 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')),
debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()
@@ -57,12 +62,12 @@ session.execute('SELECT version_num FROM alembic_version')
session.close()
# define universal redis cache access
InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
port=config.get('REDIS_PORT'),
password=config.get('REDIS_PASSWORD'),
db=config.get('REDIS_DATABASE'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
Cache.store = redis.StrictRedis(host=config.get('REDIS_HOST'),
port=config.get('REDIS_PORT'),
password=config.get('REDIS_PASSWORD'),
db=config.get('REDIS_DATABASE'),
decode_responses=True)
InMemoryUssdSession.store = Cache.store
# define metadata URL
Metadata.base_url = config.get('CIC_META_URL')
@@ -79,9 +84,17 @@ if key_file_path:
Signer.key_file_path = key_file_path
# set up translations
i18n.load_path.append(config.get('APP_LOCALE_PATH'))
i18n.set('fallback', config.get('APP_LOCALE_FALLBACK'))
i18n.load_path.append(config.get('LOCALE_PATH'))
i18n.set('fallback', config.get('LOCALE_FALLBACK'))
chain_spec = ChainSpec(
common_name=config.get('CIC_COMMON_NAME'),
engine=config.get('CIC_ENGINE'),
network_id=config.get('CIC_NETWORK_ID')
)
Chain.spec = chain_spec
Support.phone_number = config.get('OFFICE_SUPPORT_PHONE')
# set up celery
current_app = celery.Celery(__name__)
@@ -110,12 +123,12 @@ if result[:4] == 'file':
result_queue = tempfile.mkdtemp()
current_app.conf.update({
'result_backend': 'file://{}'.format(result_queue),
})
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(result_queue))
else:
current_app.conf.update({
'result_backend': result,
})
})
import cic_ussd.tasks
@@ -135,4 +148,3 @@ def main():
if __name__ == '__main__':
main()

View File

@@ -14,26 +14,25 @@ from chainlib.chain import ChainSpec
from confini import Config
# local imports
from cic_ussd.chain import Chain
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import query_default_token
from cic_ussd.cache import cache_data, cache_data_key, Cache
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.encoder import PasswordEncoder
from cic_ussd.error import InitializationError
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.http.requests import get_request_endpoint, get_request_method
from cic_ussd.http.responses import with_content_headers
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.base import Metadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
from cic_ussd.metadata.signer import Signer
from cic_ussd.phone_number import process_phone_number, Support, E164Format
from cic_ussd.processor import get_default_token_data
from cic_ussd.redis import cache_data, create_cached_data_key, InMemoryStore
from cic_ussd.requests import (get_request_endpoint,
get_request_method)
from cic_ussd.processor.ussd import handle_menu_operations
from cic_ussd.runnable.server_base import exportable_parser, logg
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.translation import translation_for
from cic_ussd.validator import check_ip, check_request_content_length, validate_phone_number, validate_presence
args = exportable_parser.parse_args()
@@ -45,7 +44,7 @@ elif args.v:
logging.getLogger().setLevel(logging.INFO)
# parse config
config = Config(config_dir=args.c, env_prefix=args.env_prefix)
config = Config(args.c, env_prefix=args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
@@ -57,8 +56,8 @@ SessionBase.connect(data_source_name,
debug=config.true('DATABASE_DEBUG'))
# set up translations
i18n.load_path.append(config.get('APP_LOCALE_PATH'))
i18n.set('fallback', config.get('APP_LOCALE_FALLBACK'))
i18n.load_path.append(config.get('LOCALE_PATH'))
i18n.set('fallback', config.get('LOCALE_FALLBACK'))
# set Fernet key
PasswordEncoder.set_key(config.get('APP_PASSWORD_PEPPER'))
@@ -69,12 +68,12 @@ ussd_menu_db = create_local_file_data_stores(file_location=config.get('USSD_MENU
UssdMenu.ussd_menu_db = ussd_menu_db
# define universal redis cache access
InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
port=config.get('REDIS_PORT'),
password=config.get('REDIS_PASSWORD'),
db=config.get('REDIS_DATABASE'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
Cache.store = redis.StrictRedis(host=config.get('REDIS_HOST'),
port=config.get('REDIS_PORT'),
password=config.get('REDIS_PASSWORD'),
db=config.get('REDIS_DATABASE'),
decode_responses=True)
InMemoryUssdSession.store = Cache.store
# define metadata URL
Metadata.base_url = config.get('CIC_META_URL')
@@ -94,8 +93,8 @@ Signer.key_file_path = key_file_path
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
# load states and transitions data
states = json_file_parser(filepath=config.get('STATEMACHINE_STATES'))
transitions = json_file_parser(filepath=config.get('STATEMACHINE_TRANSITIONS'))
states = json_file_parser(filepath=config.get('MACHINE_STATES'))
transitions = json_file_parser(filepath=config.get('MACHINE_TRANSITIONS'))
chain_spec = ChainSpec(
common_name=config.get('CIC_COMMON_NAME'),
@@ -108,24 +107,22 @@ UssdStateMachine.states = states
UssdStateMachine.transitions = transitions
# retrieve default token data
default_token_data = get_default_token_data()
chain_str = Chain.spec.__str__()
default_token_data = query_default_token(chain_str)
# cache default token for re-usability
if default_token_data:
cache_key = create_cached_data_key(
identifier=chain_str.encode('utf-8'),
salt=':cic.default_token_data'
)
cache_key = cache_data_key(chain_str.encode('utf-8'), ':cic.default_token_data')
cache_data(key=cache_key, data=json.dumps(default_token_data))
else:
raise InitializationError(f'Default token data for: {chain_str} not found.')
valid_service_codes = config.get('APP_SERVICE_CODE').split(",")
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
E164Format.region = config.get('PHONE_NUMBER_REGION')
Support.phone_number = config.get('APP_SUPPORT_PHONE_NUMBER')
E164Format.region = config.get('E164_REGION')
Support.phone_number = config.get('OFFICE_SUPPORT_PHONE')
def application(env, start_response):
@@ -168,49 +165,37 @@ def application(env, start_response):
except TypeError:
user_input = ""
# add validation for phone number
if phone_number:
phone_number = process_phone_number(phone_number=phone_number, region=E164Format.region)
# validate ip address
if not check_ip(config=config, env=env):
start_response('403 Sneaky, sneaky', errors_headers)
return []
# validate content length
if not check_request_content_length(config=config, env=env):
start_response('400 Size matters', errors_headers)
return []
# validate service code
if service_code not in valid_service_codes:
response = define_multilingual_responses(
key='ussd.kenya.invalid_service_code',
locales=['en', 'sw'],
prefix='END',
valid_service_code=valid_service_codes[0])
response_bytes, headers = define_response_with_content(headers=headers, response=response)
response = translation_for(
'ussd.kenya.invalid_service_code',
i18n.config.get('fallback'),
valid_service_code=valid_service_codes[0]
)
response_bytes, headers = with_content_headers(headers, response)
start_response('200 OK', headers)
return [response_bytes]
# validate phone number
if not validate_phone_number(phone_number):
logg.error('invalid phone number {}'.format(phone_number))
start_response('400 Invalid phone number format', errors_headers)
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
# handle menu interaction requests
chain_str = chain_spec.__str__()
response = process_menu_interaction_requests(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
queue=args.q,
service_code=service_code,
session=session,
user_input=user_input)
response_bytes, headers = define_response_with_content(headers=headers, response=response)
response = handle_menu_operations(
chain_str, external_session_id, phone_number, args.q, service_code, session, user_input
)
response_bytes, headers = with_content_headers(headers, response)
start_response('200 OK,', headers)
session.commit()
session.close()
@@ -223,4 +208,3 @@ def application(env, start_response):
session.close()
start_response('405 Play by the rules', errors_headers)
return []

View File

@@ -3,9 +3,15 @@ import logging
from typing import Optional
import json
# third party imports
# external imports
import celery
from redis import Redis
from sqlalchemy.orm.session import Session
# local imports
from cic_ussd.cache import Cache
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.ussd_session import UssdSession as DbUssdSession
logg = logging.getLogger()
@@ -13,18 +19,18 @@ logg = logging.getLogger()
class UssdSession:
"""
This class defines the USSD session object that is called whenever a user interacts with the system.
:cvar redis_cache: The in-memory redis cache.
:type redis_cache: Redis
:cvar store: The in-memory redis cache.
:type store: Redis
"""
redis_cache: Redis = None
store: Redis = None
def __init__(self,
external_session_id: str,
service_code: str,
msisdn: str,
user_input: str,
service_code: str,
state: str,
session_data: Optional[dict] = None):
user_input: str,
data: Optional[dict] = None):
"""
This function is called whenever a USSD session object is created and saves the instance to a JSON DB.
:param external_session_id: The Africa's Talking session ID.
@@ -37,16 +43,17 @@ class UssdSession:
:type user_input: str.
:param state: The name of the USSD menu that the user was interacting with.
:type state: str.
:param session_data: Any additional data that was persisted during the user's interaction with the system.
:type session_data: dict.
:param data: Any additional data that was persisted during the user's interaction with the system.
:type data: dict.
"""
self.data = data
self.external_session_id = external_session_id
self.service_code = service_code
self.msisdn = msisdn
self.user_input = user_input
self.service_code = service_code
self.state = state
self.session_data = session_data
session = self.redis_cache.get(external_session_id)
self.user_input = user_input
session = self.store.get(external_session_id)
if session:
session = json.loads(session)
self.version = session.get('version') + 1
@@ -54,16 +61,16 @@ class UssdSession:
self.version = 1
self.session = {
'data': self.data,
'external_session_id': self.external_session_id,
'service_code': self.service_code,
'msisdn': self.msisdn,
'user_input': self.user_input,
'service_code': self.service_code,
'state': self.state,
'session_data': self.session_data,
'user_input': self.user_input,
'version': self.version
}
self.redis_cache.set(self.external_session_id, json.dumps(self.session))
self.redis_cache.persist(self.external_session_id)
self.store.set(self.external_session_id, json.dumps(self.session))
self.store.persist(self.external_session_id)
def set_data(self, key: str, value: str) -> None:
"""
@@ -73,10 +80,10 @@ class UssdSession:
:param value: The actual data to be stored in the session data.
:type value: str.
"""
if self.session_data is None:
self.session_data = {}
self.session_data[key] = value
self.redis_cache.set(self.external_session_id, json.dumps(self.session))
if self.data is None:
self.data = {}
self.data[key] = value
self.store.set(self.external_session_id, json.dumps(self.session))
def get_data(self, key: str) -> Optional[str]:
"""
@@ -86,8 +93,8 @@ class UssdSession:
:return: This function returns the queried data if found, else it doesn't return any value.
:rtype: str.
"""
if self.session_data is not None:
return self.session_data.get(key)
if self.data is not None:
return self.data.get(key)
else:
return None
@@ -97,11 +104,155 @@ class UssdSession:
:rtype: dict
"""
return {
"data": self.data,
"external_session_id": self.external_session_id,
"service_code": self.service_code,
"msisdn": self.msisdn,
"user_input": self.user_input,
"service_code": self.service_code,
"state": self.state,
"session_data": self.session_data,
"version": self.version
}
def create_ussd_session(
state: str,
external_session_id: str,
msisdn: str,
service_code: str,
user_input: str,
data: Optional[dict] = None) -> UssdSession:
"""
:param state:
:type state:
:param external_session_id:
:type external_session_id:
:param msisdn:
:type msisdn:
:param service_code:
:type service_code:
:param user_input:
:type user_input:
:param data:
:type data:
:return:
:rtype:
"""
return UssdSession(external_session_id=external_session_id,
msisdn=msisdn,
user_input=user_input,
state=state,
service_code=service_code,
data=data
)
def update_ussd_session(ussd_session: UssdSession,
user_input: str,
state: str,
data: Optional[dict] = None) -> UssdSession:
""""""
if data is None:
data = ussd_session.data
return UssdSession(
external_session_id=ussd_session.external_session_id,
msisdn=ussd_session.msisdn,
user_input=user_input,
state=state,
service_code=ussd_session.service_code,
data=data
)
def create_or_update_session(external_session_id: str,
msisdn: str,
service_code: str,
user_input: str,
state: str,
session,
data: Optional[dict] = None) -> UssdSession:
"""
:param external_session_id:
:type external_session_id:
:param msisdn:
:type msisdn:
:param service_code:
:type service_code:
:param user_input:
:type user_input:
:param state:
:type state:
:param session:
:type session:
:param data:
:type data:
:return:
:rtype:
"""
session = SessionBase.bind_session(session=session)
existing_ussd_session = session.query(DbUssdSession).filter_by(
external_session_id=external_session_id).first()
if existing_ussd_session:
ussd_session = update_ussd_session(ussd_session=existing_ussd_session,
state=state,
user_input=user_input,
data=data
)
else:
ussd_session = create_ussd_session(external_session_id=external_session_id,
msisdn=msisdn,
service_code=service_code,
user_input=user_input,
state=state,
data=data
)
SessionBase.release_session(session=session)
return ussd_session
def persist_ussd_session(external_session_id: str, queue: Optional[str]):
"""This function asynchronously retrieves a cached ussd session object matching an external ussd session id and adds
it to persistent storage.
:param external_session_id: Session id value provided by ussd service provided.
:type external_session_id: str
:param queue: Name of worker queue to submit tasks to.
:type queue: str
"""
s_persist_ussd_session = celery.signature(
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id],
queue=queue
)
s_persist_ussd_session.apply_async()
def save_session_data(queue: Optional[str], session: Session, data: dict, ussd_session: dict):
"""This function is stores information to the session data attribute of a cached ussd session object.
:param data: A dictionary containing data for a specific ussd session in redis that needs to be saved
temporarily.
:type data: dict
:param queue: The queue on which the celery task should run.
:type queue: str
:param session: Database session object.
:type session: Session
:param ussd_session: A ussd session passed to the state machine.
:type ussd_session: UssdSession
"""
cache = Cache.store
external_session_id = ussd_session.get('external_session_id')
existing_session_data = ussd_session.get('data')
if existing_session_data:
data = {**existing_session_data, **data}
in_redis_ussd_session = cache.get(external_session_id)
in_redis_ussd_session = json.loads(in_redis_ussd_session)
create_or_update_session(
external_session_id=external_session_id,
msisdn=in_redis_ussd_session.get('msisdn'),
service_code=in_redis_ussd_session.get('service_code'),
user_input=in_redis_ussd_session.get('user_input'),
state=in_redis_ussd_session.get('state'),
session=session,
data=data
)
persist_ussd_session(external_session_id=external_session_id, queue=queue)

View File

@@ -0,0 +1,248 @@
# standard imports
import json
import logging
from typing import Tuple
# third-party imports
import celery
import i18n
from chainlib.hash import strip_0x
from cic_types.models.person import get_contact_data_from_vcard, generate_vcard_from_contact_data, manage_identity_data
# local imports
from cic_ussd.account.chain import Chain
from cic_ussd.account.maps import gender, language
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.metadata import PersonMetadata
from cic_ussd.session.ussd_session import save_session_data
from cic_ussd.translation import translation_for
from sqlalchemy.orm.session import Session
logg = logging.getLogger(__file__)
def change_preferred_language(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
r_user_input = language().get(user_input)
session = SessionBase.bind_session(session)
account.preferred_language = r_user_input
session.add(account)
session.flush()
SessionBase.release_session(session)
preferences_data = {
'preferred_language': r_user_input
}
s = celery.signature(
'cic_ussd.tasks.metadata.add_preferences_metadata',
[account.blockchain_address, preferences_data],
queue='cic-ussd'
)
return s.apply_async()
def update_account_status_to_active(state_machine_data: Tuple[str, dict, Account, Session]):
"""This function sets user's account to active.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, account, session = state_machine_data
session = SessionBase.bind_session(session=session)
account.activate_account()
session.add(account)
session.flush()
SessionBase.release_session(session=session)
def parse_gender(account: Account, user_input: str):
"""
:param account:
:type account:
:param user_input:
:type user_input:
:return:
:rtype:
"""
preferred_language = get_cached_preferred_language(account.blockchain_address)
if not preferred_language:
preferred_language = i18n.config.get('fallback')
r_user_input = gender().get(user_input)
return translation_for(f'helpers.{r_user_input}', preferred_language)
def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, Account, Session]):
"""This function saves first name data to the ussd session in the redis cache.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, account, session = state_machine_data
session = SessionBase.bind_session(session=session)
current_state = ussd_session.get('state')
key = ''
if 'given_name' in current_state:
key = 'given_name'
if 'date_of_birth' in current_state:
key = 'date_of_birth'
if 'family_name' in current_state:
key = 'family_name'
if 'gender' in current_state:
key = 'gender'
user_input = parse_gender(account, user_input)
if 'location' in current_state:
key = 'location'
if 'products' in current_state:
key = 'products'
if ussd_session.get('data'):
data = ussd_session.get('data')
data[key] = user_input
else:
data = {
key: user_input
}
save_session_data('cic-ussd', session, data, ussd_session)
SessionBase.release_session(session)
def parse_person_metadata(account: Account, metadata: dict):
"""
:param account:
:type account:
:param metadata:
:type metadata:
:return:
:rtype:
"""
set_gender = metadata.get('gender')
given_name = metadata.get('given_name')
family_name = metadata.get('family_name')
email = metadata.get('email')
if isinstance(metadata.get('date_of_birth'), dict):
date_of_birth = metadata.get('date_of_birth')
else:
date_of_birth = {
"year": int(metadata.get('date_of_birth')[:4])
}
if isinstance(metadata.get('location'), dict):
location = metadata.get('location')
else:
location = {
"area_name": metadata.get('location')
}
if isinstance(metadata.get('products'), list):
products = metadata.get('products')
else:
products = metadata.get('products').split(',')
phone_number = account.phone_number
date_registered = int(account.created.replace().timestamp())
blockchain_address = account.blockchain_address
chain_spec = f'{Chain.spec.common_name()}:{Chain.spec.engine()}: {Chain.spec.chain_id()}'
if isinstance(metadata.get('identities'), dict):
identities = metadata.get('identities')
else:
identities = manage_identity_data(
blockchain_address=blockchain_address,
blockchain_type=Chain.spec.engine(),
chain_spec=chain_spec
)
return {
"date_registered": date_registered,
"date_of_birth": date_of_birth,
"gender": set_gender,
"identities": identities,
"location": location,
"products": products,
"vcard": generate_vcard_from_contact_data(
email=email,
family_name=family_name,
given_name=given_name,
tel=phone_number
)
}
def save_complete_person_metadata(state_machine_data: Tuple[str, dict, Account, Session]):
"""This function persists elements of the user metadata stored in session data
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, account, session = state_machine_data
metadata = ussd_session.get('data')
person_metadata = parse_person_metadata(account, metadata)
blockchain_address = account.blockchain_address
s_create_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_person_metadata', [blockchain_address, person_metadata], queue='cic-ussd')
s_create_person_metadata.apply_async()
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account, Session]):
"""
:param state_machine_data:
:type state_machine_data:
:return:
:rtype:
"""
user_input, ussd_session, account, session = state_machine_data
blockchain_address = account.blockchain_address
identifier = bytes.fromhex(strip_0x(blockchain_address))
person_metadata = PersonMetadata(identifier)
cached_person_metadata = person_metadata.get_cached_metadata()
if not cached_person_metadata:
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
person_metadata = json.loads(cached_person_metadata)
data = ussd_session.get('data')
contact_data = {}
vcard = person_metadata.get('vcard')
if vcard:
contact_data = get_contact_data_from_vcard(vcard)
person_metadata.pop('vcard')
given_name = data.get('given_name') or contact_data.get('given')
family_name = data.get('family_name') or contact_data.get('family')
date_of_birth = data.get('date_of_birth') or person_metadata.get('date_of_birth')
set_gender = data.get('gender') or person_metadata.get('gender')
location = data.get('location') or person_metadata.get('location')
products = data.get('products') or person_metadata.get('products')
if isinstance(date_of_birth, str):
year = int(date_of_birth)
person_metadata['date_of_birth'] = {'year': year}
person_metadata['gender'] = set_gender
person_metadata['given_name'] = given_name
person_metadata['family_name'] = family_name
if isinstance(location, str):
location_data = person_metadata.get('location')
location_data['area_name'] = location
person_metadata['location'] = location_data
person_metadata['products'] = products
if contact_data:
contact_data.pop('given')
contact_data.pop('family')
contact_data.pop('tel')
person_metadata = {**person_metadata, **contact_data}
parsed_person_metadata = parse_person_metadata(account, person_metadata)
s_edit_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_person_metadata',
[blockchain_address, parsed_person_metadata]
)
s_edit_person_metadata.apply_async(queue='cic-ussd')

Some files were not shown because too many files have changed in this diff Show More