Compare commits

...

13 Commits

Author SHA1 Message Date
nolash
1e1f2d5dc0 Rehabilitate sovereign import 2021-04-07 11:08:19 +02:00
Louis Holbrook
5a4e0b8eba Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks

See merge request grassrootseconomics/cic-internal-integration!89
2021-04-07 06:31:26 +00:00
Louis Holbrook
226699568f Discover queue for external tasks 2021-04-07 06:31:26 +00:00
Louis Holbrook
ec2b0e56e5 Merge branch 'lash/add-missing-state-file' into 'master'
Add stat file

See merge request grassrootseconomics/cic-internal-integration!92
2021-04-07 06:26:44 +00:00
nolash
6ffaca5207 Add stat file 2021-04-07 08:24:10 +02:00
Louis Holbrook
5c6375c9ec Merge branch 'lash/calculate-block-time' into 'master'
Add block time calc to retry, tracker

See merge request grassrootseconomics/cic-internal-integration!90
2021-04-06 19:11:43 +00:00
Louis Holbrook
99f55f01ed Add block time calc to retry, tracker 2021-04-06 19:11:42 +00:00
Louis Holbrook
086308fdb8 Merge branch 'lash/remove-stray-stat' into 'master'
Remove stray chainlib stat

See merge request grassrootseconomics/cic-internal-integration!88
2021-04-06 18:11:31 +00:00
nolash
f8f74a17f6 Remove stray chainlib stat 2021-04-06 20:08:18 +02:00
fd629cdc51 Merge branch 'philip/ussd-bug-fixes' into 'master'
Philip/ussd bug fixes

See merge request grassrootseconomics/cic-internal-integration!72
2021-04-06 17:53:38 +00:00
e9fb80ab78 Philip/ussd bug fixes 2021-04-06 17:53:38 +00:00
Louis Holbrook
7728f38f14 Merge branch 'lash/4k' into 'master'
Make 40k import pass

See merge request grassrootseconomics/cic-internal-integration!86
2021-04-06 15:14:04 +00:00
Louis Holbrook
a305aafc86 Make 40k import pass 2021-04-06 15:14:04 +00:00
48 changed files with 353 additions and 153 deletions

View File

@@ -1,13 +1,13 @@
cic-base~=0.1.2a58
cic-base~=0.1.2a62
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a10
cic-eth-registry~=0.5.4a12
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainlib~=0.0.2a2
chainlib~=0.0.2a5
chainsyncer~=0.0.1a21

View File

@@ -61,8 +61,8 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens:
address = t['address']
token = ERC20Token(rpc, address)
c = ERC20()
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)

View File

@@ -171,6 +171,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
except NotFoundEthException as e:
pass
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
@@ -180,6 +181,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
[
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
not success,
],
queue=queue,

View File

@@ -23,7 +23,7 @@ def translate_address(address, trusted_addresses, chain_spec, sender_address=ZER
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
c = AddressDeclarator()
c = AddressDeclarator(chain_spec)
for trusted_address in trusted_addresses:
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)

View File

@@ -20,10 +20,10 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final(chain_spec_dict, tx_hash, block=None, fail=False):
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.state.set_final(chain_spec, tx_hash, block, fail, session=session)
r = chainqueue.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
session.close()
return r

View File

@@ -39,6 +39,7 @@ class TxFilter(SyncFilter):
self.chain_spec.asdict(),
add_0x(tx_hash_hex),
tx.block.number,
tx.index,
tx.status == Status.ERROR,
],
queue=self.queue,

View File

@@ -20,6 +20,7 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import LockEnum
from cic_eth.runnable.daemons.filters.straggler import StragglerFilter
from cic_eth.sync.retry import RetrySyncer
from cic_eth.stat import init_chain_stat
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -71,57 +72,21 @@ RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='def
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
## TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
#def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
# tx_dict = get_tx(tx_hash)
# tx = unpack(tx_dict['signed_tx'], chain_spec)
# logg.debug('submitting tx {} for retry'.format(tx_hash))
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
[
# tx_hash,
# chain_str,
# LockEnum.QUEUE,
# tx['from'],
# ],
# queue=queue,
# )
## s_resume = celery.signature(
## 'cic_eth.eth.tx.resume_tx',
## [
## chain_str,
## ],
## queue=queue,
## )
#
## s_retry_status = celery.signature(
## 'cic_eth.queue.state.set_ready',
## [],
## queue=queue,
## )
# s_resend = celery.signature(
# 'cic_eth.eth.gas.resend_with_higher_gas',
# [
# chain_str,
# ],
# queue=queue,
# )
#
# #s_resume.link(s_retry_status)
# #s_check.link(s_resume)
# s_check.link(s_resend)
# s_check.apply_async()
def main():
conn = RPCConnection.connect(chain_spec, 'default')
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(conn)
loop_interval = stat.block_average()
syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0)
fltr = StragglerFilter(chain_spec, queue=queue)
syncer.add_filter(fltr)
syncer.loop(float(straggler_delay), conn)
syncer.loop(int(loop_interval), conn)
if __name__ == '__main__':

View File

@@ -7,7 +7,7 @@ import argparse
import sys
import re
# third-party imports
# external imports
import confini
import celery
import rlp
@@ -42,6 +42,7 @@ from cic_eth.runnable.daemons.filters import (
RegistrationFilter,
TransferAuthFilter,
)
from cic_eth.stat import init_chain_stat
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -66,7 +67,6 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -76,7 +76,13 @@ def main():
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
block_current = int(r, 16)
block_offset = block_current + 1
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average()
logg.debug('starting at block {}'.format(block_offset))
@@ -140,7 +146,8 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
#r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
r = syncer.loop(int(loop_interval), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1

View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'alpha.4',
'beta.1',
)
version_object = semver.VersionInfo(

View File

@@ -2,3 +2,4 @@
registry_address =
chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
tx_retry_delay = 20

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval = 1
loop_interval =

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval = 1
loop_interval =

View File

@@ -29,7 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a60
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -1,8 +1,8 @@
cic-base~=0.1.2a60
cic-base~=0.1.2a62
celery==4.4.7
crypto-dev-signer~=0.4.14a17
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a11
cic-eth-registry~=0.5.4a12
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
@@ -16,10 +16,10 @@ semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a7
chainlib~=0.0.2a4
chainlib~=0.0.2a5
hexathon~=0.0.1a7
chainsyncer~=0.0.1a21
chainqueue~=0.0.1a5
chainqueue~=0.0.1a7
pysha3==1.0.2
coincurve==15.0.0
sarafu-faucet~=0.0.2a16
sarafu-faucet~=0.0.2a19

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -38,11 +38,12 @@ def test_transfer_api(
agent_roles,
cic_registry,
register_tokens,
register_lookups,
celery_session_worker,
):
#token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
foo_token_cache = ERC20Token(eth_rpc, foo_token)
foo_token_cache = ERC20Token(default_chain_spec, eth_rpc, foo_token)
api = Api(str(default_chain_spec), callback_param='transfer', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer(custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024, foo_token_cache.symbol)

View File

@@ -116,7 +116,7 @@ def test_register_account(
init_eth_tester.mine_block()
c = AccountRegistry()
c = AccountRegistry(default_chain_spec)
o = c.have(account_registry, eth_empty_accounts[0], sender_address=call_sender)
r = eth_rpc.do(o)
assert int(strip_0x(r), 16) == 1

View File

@@ -43,7 +43,7 @@ def test_filter_process(
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
init_eth_tester.mine_blocks(13)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 1024)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
@@ -56,7 +56,7 @@ def test_filter_process(
# external tx
init_eth_tester.mine_blocks(28)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 512)
eth_rpc.do(o)
o = receipt(tx_hash_hex)

View File

@@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -6,7 +6,7 @@ RUN apt-get update && \
WORKDIR /usr/src/cic-notify
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
COPY cic-notify/setup.cfg \
cic-notify/setup.py \

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a2
version= 0.4.0a3
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,3 +45,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -27,7 +27,7 @@ def define_account_tx_metadata(user: User):
if account_metadata:
account_metadata = json.loads(account_metadata)
person = Person()
deserialized_person = person.deserialize(metadata=account_metadata)
deserialized_person = person.deserialize(person_data=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
@@ -46,4 +46,4 @@ def retrieve_account_statement(blockchain_address: str):
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
callback_param=blockchain_address
)
result = cic_eth_api.list(address=blockchain_address, limit=9)
cic_eth_api.list(address=blockchain_address, limit=9)

View File

@@ -128,8 +128,8 @@
},
"22": {
"description": "Pin entry menu.",
"display_key": "ussd.kenya.standard_pin_authorization",
"name": "standard_pin_authorization",
"display_key": "ussd.kenya.display_metadata_pin_authorization",
"name": "display_metadata_pin_authorization",
"parent": "start"
},
"23": {
@@ -230,9 +230,22 @@
},
"39": {
"description": "Menu to instruct users to call the office.",
"display_key": "ussd.key.help",
"display_key": "ussd.kenya.help",
"name": "help",
"parent": null
},
"40": {
"description": "Menu to display a user's entire profile",
"display_key": "ussd.kenya.display_user_metadata",
"name": "display_user_metadata",
"parent": "account_management"
},
"41": {
"description": "The recipient is not in the system",
"display_key": "ussd.kenya.exit_invalid_recipient",
"name": "exit_invalid_recipient",
"parent": null
}
}
}

View File

@@ -92,7 +92,7 @@ class UserMetadata:
# validate data
person = Person()
deserialized_person = person.deserialize(metadata=json.loads(data))
deserialized_person = person.deserialize(person_data=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:

View File

@@ -15,6 +15,7 @@ from cic_ussd.balance import BalanceManager, compute_operational_balance, get_ca
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
@@ -22,6 +23,7 @@ from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.conversions import to_wei, from_wei
from cic_ussd.translation import translation_for
from cic_types.models.person import generate_metadata_pointer, get_contact_data_from_vcard
logg = logging.getLogger(__name__)
@@ -136,7 +138,7 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
tx_sender_information = define_account_tx_metadata(user=user)
token_symbol = 'SRF'
user_input = ussd_session.get('user_input').split('*')[-1]
user_input = ussd_session.get('session_data').get('transaction_amount')
transaction_amount = to_wei(value=int(user_input))
logg.debug('Requires integration to determine user tokens.')
return process_pin_authorization(
@@ -187,21 +189,55 @@ def format_transactions(transactions: list, preferred_language: str):
value = transaction.get('to_value')
timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag')
direction = transaction.get('direction')
token_symbol = 'SRF'
if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {recipient_phone_number} {timestamp}.\n'
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {recipient_phone_number} {timestamp}.\n'
else:
formatted_transactions += f'{action_tag} {value} {token_symbol} {sender_phone_number} {timestamp}. \n'
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {sender_phone_number} {timestamp}. \n'
return formatted_transactions
else:
if preferred_language == 'en':
formatted_transactions = 'Empty'
formatted_transactions = 'NO TRANSACTION HISTORY'
else:
formatted_transactions = 'Hamna historia'
formatted_transactions = 'HAMNA RIPOTI YA MATUMIZI'
return formatted_transactions
def process_display_user_metadata(user: User, display_key: str):
"""
:param user:
:type user:
:param display_key:
:type display_key:
"""
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
user_metadata = get_cached_data(key)
if user_metadata:
user_metadata = json.loads(user_metadata)
contact_data = get_contact_data_from_vcard(vcard=user_metadata.get('vcard'))
logg.debug(f'{contact_data}')
full_name = f'{contact_data.get("given")} {contact_data.get("family")}'
gender = user_metadata.get('gender')
products = ', '.join(user_metadata.get('products'))
location = user_metadata.get('location').get('area_name')
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
full_name=full_name,
gender=gender,
location=location,
products=products
)
else:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
def process_account_statement(user: User, display_key: str, ussd_session: dict):
"""
:param user:
@@ -229,7 +265,7 @@ def process_account_statement(user: User, display_key: str, ussd_session: dict):
middle_transaction_set += transactions[3:][:3]
first_transaction_set += transactions[:3]
# there are probably much cleaner and operational inexpensive ways to do this so find them
elif 4 < len(transactions) < 7:
elif 3 < len(transactions) < 7:
middle_transaction_set += transactions[3:]
first_transaction_set += transactions[:3]
else:
@@ -349,18 +385,27 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
if user.has_valid_pin():
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number)
key = create_cached_data_key(
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
salt='cic.person'
cic_type='cic.person'
)
logg.debug(f'METADATA POINTER: {key}')
user_metadata = get_cached_data(key=key)
logg.debug(f'METADATA: {user_metadata}')
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state == 'account_creation_prompt' and user_metadata is not None:
if last_state in [
'account_creation_prompt',
'exit',
'exit_invalid_pin',
'exit_invalid_new_pin',
'exit_pin_mismatch',
'exit_invalid_request'
] and user_metadata is not None:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)
@@ -420,9 +465,13 @@ def custom_display_text(
return process_start_menu(display_key=display_key, user=user)
elif 'pin_authorization' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif 'enter_current_pin' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif menu_name == 'account_balances':
return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session)
elif 'transaction_set' in menu_name:
return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session)
elif menu_name == 'display_user_metadata':
return process_display_user_metadata(display_key=display_key, user=user)
else:
return translation_for(key=display_key, preferred_language=user.preferred_language)

View File

@@ -35,7 +35,8 @@ from cic_ussd.requests import (get_request_endpoint,
process_pin_reset_requests)
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number
from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number, \
validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -101,9 +102,15 @@ InMemoryUssdSession.redis_cache = InMemoryStore.cache
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
export_dir = config.get('PGP_EXPORT_DIR')
if export_dir:
validate_presence(path=export_dir)
Signer.gpg_path = export_dir
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
# initialize celery app
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))

View File

@@ -16,6 +16,7 @@ from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -65,9 +66,15 @@ InMemoryUssdSession.redis_cache = InMemoryStore.cache
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
export_dir = config.get('PGP_EXPORT_DIR')
if export_dir:
validate_presence(path=export_dir)
Signer.gpg_path = export_dir
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
# set up celery
current_app = celery.Celery(__name__)

View File

@@ -78,9 +78,10 @@ def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Us
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
session_data = {
'recipient_phone_number': user_input
}
session_data = ussd_session.get('session_data') or {}
session_data['recipient_phone_number'] = user_input
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
@@ -109,9 +110,10 @@ def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict,
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
session_data = {
'transaction_amount': user_input
}
session_data = ussd_session.get('session_data') or {}
session_data['transaction_amount'] = user_input
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)

View File

@@ -192,7 +192,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
# validate user metadata
person = Person()
user_metadata = json.loads(user_metadata)
deserialized_person = person.deserialize(metadata=user_metadata)
deserialized_person = person.deserialize(person_data=user_metadata)
# edit specific metadata attribute
if given_name:

View File

@@ -143,14 +143,18 @@ def define_transaction_action_tag(
# check preferred language
if preferred_language == 'en':
action_tag = 'SENT'
direction = 'TO'
else:
action_tag = 'ULITUMA'
direction = 'KWA'
else:
if preferred_language == 'en':
action_tag = 'RECEIVED'
direction = 'FROM'
else:
action_tag = 'ULIPOKEA'
return action_tag
direction = 'KUTOKA'
return action_tag, direction
@celery_app.task
@@ -175,15 +179,17 @@ def process_statement_callback(result, param: str, status_code: int):
# check if sender is in the system
sender: User = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
owner: User = session.query(User).filter_by(blockchain_address=param).first()
if sender:
processed_transaction['sender_phone_number'] = sender.phone_number
action_tag = define_transaction_action_tag(
preferred_language=sender.preferred_language,
action_tag, direction = define_transaction_action_tag(
preferred_language=owner.preferred_language,
sender_blockchain_address=sender_blockchain_address,
param=param
)
processed_transaction['action_tag'] = action_tag
processed_transaction['direction'] = direction
else:
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
@@ -197,8 +203,8 @@ def process_statement_callback(result, param: str, status_code: int):
logg.warning(f'Tx with recipient not found in cic-ussd')
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value'))
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value'))
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()
raw_timestamp = transaction.get('timestamp')
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')

View File

@@ -47,7 +47,7 @@ def to_wei(value: int) -> int:
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+18)
return int(value * 1e+6)
class IncomingTransactionProcessor:

View File

@@ -1,5 +1,6 @@
# standard imports
import logging
import os
import re
# third-party imports
@@ -110,7 +111,7 @@ def validate_phone_number(phone: str):
def validate_response_type(processor_response: str) -> bool:
"""1*3443*3443*Philip*Wanga*1*Juja*Software Developer*2*3
"""
This function checks the prefix for a corresponding menu's text from the response offered by the Ussd Processor and
determines whether the response should prompt the end of a ussd session or the
:param processor_response: A ussd menu's text value.
@@ -126,3 +127,14 @@ def validate_response_type(processor_response: str) -> bool:
return True
return False
def validate_presence(path: str):
"""
"""
is_present = os.path.exists(path=path)
if not is_present:
raise ValueError(f'Directory/File in path: {path} not found.')
else:
logg.debug(f'Loading data from: {path}')

View File

@@ -4,5 +4,6 @@
"enter_gender",
"enter_age",
"enter_location",
"enter_products"
"enter_products",
"display_metadata_pin_authorization"
]

View File

@@ -82,7 +82,7 @@ def test_format_user_metadata(create_activated_user,
from cic_types.models.person import Person
formatted_user_metadata = format_user_metadata(metadata=complete_user_metadata, user=create_activated_user)
person = Person()
user_metadata = person.deserialize(metadata=formatted_user_metadata)
user_metadata = person.deserialize(person_data=formatted_user_metadata)
assert formatted_user_metadata == user_metadata.serialize()

View File

@@ -38,7 +38,7 @@
{
"trigger": "scan_data",
"source": "exit_invalid_recipient",
"dest": "send_enter_recipient",
"dest": "enter_transaction_recipient",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
@@ -49,13 +49,13 @@
"after": "cic_ussd.state_machine.logic.sms.upsell_unregistered_recipient"
},
{
"trigger": "feed_char",
"trigger": "scan_data",
"source": "exit_successful_transaction",
"dest": "start",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "feed_char",
"trigger": "scan_data",
"source": "exit_successful_transaction",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_ninety_nine_selected"

View File

@@ -26,18 +26,18 @@
{
"trigger": "scan_data",
"source": "metadata_management",
"dest": "standard_pin_authorization",
"dest": "display_metadata_pin_authorization",
"conditions": "cic_ussd.state_machine.logic.menu.menu_five_selected"
},
{
"trigger": "scan_data",
"source": "standard_pin_authorization",
"source": "display_metadata_pin_authorization",
"dest": "display_user_metadata",
"conditions": "cic_ussd.state_machine.logic.pin.is_authorized_pin"
},
{
"trigger": "scan_data",
"source": "standard_pin_authorization",
"source": "display_metadata_pin_authorization",
"dest": "exit_pin_blocked",
"conditions": "cic_ussd.state_machine.logic.pin.is_locked_account"
},

View File

@@ -55,8 +55,8 @@ en:
4. Edit products
5. View my profile
0. Back
display_user_profile_data: |-
END Your details are:
display_user_metadata: |-
CON Your details are:
Name: %{full_name}
Gender: %{gender}
Location: %{location}
@@ -85,7 +85,7 @@ en:
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
0. Back
standard_pin_authorization:
display_metadata_pin_authorization:
first: |-
CON Please enter your PIN.
0. Back

View File

@@ -56,7 +56,7 @@ sw:
5. Angalia wasifu wako
0. Nyuma
display_user_metadata: |-
END Wasifu wako una maelezo yafuatayo:
CON Wasifu wako una maelezo yafuatayo:
Jina: %{full_name}
Jinsia: %{gender}
Eneo: %{location}

View File

@@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a60
ARG cic_eth_version=0.11.0a4
ARG sarafu_faucet_version=0.0.2a16
ARG cic_base_version=0.1.2a62
ARG cic_eth_version=0.11.0b1
ARG sarafu_faucet_version=0.0.2a19
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \

View File

@@ -6,6 +6,7 @@ DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER=0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
DEV_ETH_ACCOUNT_RESERVE_MINTER=${DEV_ETH_ACCOUNT_RESERVE_MINTER:-$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER}
DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER=${DEV_ETH_ACCOUNT_RESERVE_MINTER:-$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER}
DEV_RESERVE_AMOUNT=${DEV_ETH_RESERVE_AMOUNT:-""10000000000000000000000000000000000}
faucet_amount=${DEV_FAUCET_AMOUNT:-0}
keystore_file=$(realpath ./keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c)
echo "environment:"
@@ -71,6 +72,8 @@ if [[ -n "${ETH_PROVIDER}" ]]; then
# Sarafu faucet contract
>&2 echo "deploy token faucet contract"
DEV_FAUCET_ADDRESS=`sarafu-faucet-deploy -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -w -v --account-index-address $DEV_ACCOUNT_INDEX_ADDRESS $DEV_RESERVE_ADDRESS`
>&2 echo "set token faucet amount"
sarafu-faucet-set -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_FAUCET_ADDRESS $faucet_amount
eth-contract-registry-set -w -y $keystore_file -r $CIC_REGISTRY_ADDRESS -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -vv Faucet $DEV_FAUCET_ADDRESS
>&2 echo "set faucet as token minter"
giftable-token-minter -w -y $keystore_file -a $DEV_RESERVE_ADDRESS -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -vv $DEV_FAUCET_ADDRESS

View File

@@ -88,7 +88,7 @@ signer = EIP155Signer(keystore)
nonce_oracle = RPCNonceOracle(signer_address, rpc)
registry = Registry()
registry = Registry(chain_spec)
o = registry.address_of(config.get('CIC_REGISTRY_ADDRESS'), 'AccountRegistry')
r = rpc.do(o)
account_registry_address = registry.parse_address_of(r)

View File

@@ -124,10 +124,6 @@ def register_eth(i, u):
return address
def register_ussd(u):
pass
if __name__ == '__main__':
#fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a')
@@ -155,8 +151,6 @@ if __name__ == '__main__':
sub_chain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [new_address]
register_ussd(u)
new_address_clean = strip_0x(new_address)
filepath = os.path.join(
user_new_dir,

View File

@@ -1,5 +1,5 @@
cic-base[full_graph]==0.1.2a60
sarafu-faucet==0.0.2a16
cic-eth==0.11.0a4
cic-base[full_graph]==0.1.2a61
sarafu-faucet==0.0.2a19
cic-eth==0.11.0b1
cic-types==0.1.0a10
crypto-dev-signer==0.4.14a17

View File

@@ -21,8 +21,6 @@ debug='-vv'
abi_dir=${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
gas_amount=100000000000000000000000
token_amount=${gas_amount}
#faucet_amount=1000000000
faucet_amount=${DEV_FAUCET_AMOUNT:-0}
env_out_file=${CIC_DATA_DIR}/.env_seed
init_level_file=${CIC_DATA_DIR}/.init
truncate $env_out_file -s 0