Consolidated ussd dataseeding script

This commit is contained in:
Philip Wafula 2021-08-29 09:55:47 +00:00
parent 2afb20e715
commit 92cc6a3f27
18 changed files with 520 additions and 673 deletions

View File

@ -11,12 +11,12 @@ celery_app = celery.current_app
@celery_app.task @celery_app.task
def persist_notification(recipient, message): def persist_notification(message, recipient):
""" """
:param recipient:
:type recipient:
:param message: :param message:
:type message: :type message:
:param recipient:
:type recipient:
:return: :return:
:rtype: :rtype:
""" """

View File

@ -11,12 +11,13 @@ local_logg = logging.getLogger(__name__)
@celery_app.task @celery_app.task
def log(recipient, message): def log(message, recipient):
""" """
:param recipient:
:type recipient:
:param message: :param message:
:type message: :type message:
:param recipient:
:type recipient:
:return: :return:
:rtype: :rtype:
""" """

View File

@ -1,5 +1,6 @@
# standard import # standard import
import decimal import decimal
import json
import logging import logging
from typing import Dict, Tuple from typing import Dict, Tuple
@ -8,6 +9,8 @@ from cic_eth.api import Api
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
# local import # local import
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_default_token
from cic_ussd.db.models.account import Account from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import UnknownUssdRecipient from cic_ussd.error import UnknownUssdRecipient
@ -59,7 +62,9 @@ def from_wei(value: int) -> float:
:return: SRF equivalent of value in Wei :return: SRF equivalent of value in Wei
:rtype: float :rtype: float
""" """
value = float(value) / 1e+6 cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
value = float(value) / (10**token_decimals)
return truncate(value=value, decimals=2) return truncate(value=value, decimals=2)
@ -70,7 +75,9 @@ def to_wei(value: int) -> int:
:return: Wei equivalent of value in SRF :return: Wei equivalent of value in SRF
:rtype: int :rtype: int
""" """
return int(value * 1e+6) cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
return int(value * (10**token_decimals))
def truncate(value: float, decimals: int): def truncate(value: float, decimals: int):

View File

@ -44,7 +44,7 @@ class MetadataRequestsHandler(Metadata):
def create(self, data: Union[Dict, str]): def create(self, data: Union[Dict, str]):
"""""" """"""
data = json.dumps(data) data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers) result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
error_handler(result=result) error_handler(result=result)

View File

@ -146,7 +146,7 @@ def create_ussd_session(
) )
def update_ussd_session(ussd_session: UssdSession, def update_ussd_session(ussd_session: DbUssdSession,
user_input: str, user_input: str,
state: str, state: str,
data: Optional[dict] = None) -> UssdSession: data: Optional[dict] = None) -> UssdSession:

View File

@ -138,26 +138,14 @@ def transaction_balances_callback(self, result: list, param: dict, status_code:
balances_data = result[0] balances_data = result[0]
available_balance = calculate_available_balance(balances_data) available_balance = calculate_available_balance(balances_data)
transaction = param transaction = param
blockchain_address = transaction.get('blockchain_address')
transaction['available_balance'] = available_balance transaction['available_balance'] = available_balance
queue = self.request.delivery_info.get('routing_key') queue = self.request.delivery_info.get('routing_key')
s_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_preferences_metadata', [blockchain_address], queue=queue
)
s_process_account_metadata = celery.signature( s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue 'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue
) )
s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue) s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue)
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
if transaction.get('transaction_type') == 'transfer':
celery.chain(s_preferences_metadata, s_process_account_metadata, s_notify_account).apply_async()
if transaction.get('transaction_type') == 'tokengift':
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [{}, transaction], queue=queue
)
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
@celery_app.task @celery_app.task

View File

@ -8,6 +8,7 @@ import i18n
from chainlib.hash import strip_0x from chainlib.hash import strip_0x
# local imports # local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import get_cached_statement from cic_ussd.account.statement import get_cached_statement
from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account
from cic_ussd.cache import cache_data, cache_data_key from cic_ussd.cache import cache_data, cache_data_key
@ -58,19 +59,17 @@ def cache_statement(parsed_transaction: dict, querying_party: str):
@celery_app.task @celery_app.task
def parse_transaction(preferences: dict, transaction: dict) -> dict: def parse_transaction(transaction: dict) -> dict:
"""This function parses transaction objects and collates all relevant data for system use i.e: """This function parses transaction objects and collates all relevant data for system use i.e:
- An account's set preferred language. - An account's set preferred language.
- Account identifier that facilitates notification. - Account identifier that facilitates notification.
- Contextual tags i.e action and direction tags. - Contextual tags i.e action and direction tags.
:param preferences: An account's set preferences.
:type preferences: dict
:param transaction: Transaction object. :param transaction: Transaction object.
:type transaction: dict :type transaction: dict
:return: Transaction object with contextual data for use in the system. :return: Transaction object with contextual data for use in the system.
:rtype: dict :rtype: dict
""" """
preferred_language = preferences.get('preferred_language') preferred_language = get_cached_preferred_language(transaction.get('blockchain_address'))
if not preferred_language: if not preferred_language:
preferred_language = i18n.config.get('fallback') preferred_language = i18n.config.get('fallback')
transaction['preferred_language'] = preferred_language transaction['preferred_language'] = preferred_language
@ -83,6 +82,8 @@ def parse_transaction(preferences: dict, transaction: dict) -> dict:
alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first() alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first()
if alt_account: if alt_account:
transaction['alt_metadata_id'] = alt_account.standard_metadata_id() transaction['alt_metadata_id'] = alt_account.standard_metadata_id()
else:
transaction['alt_metadata_id'] = 'GRASSROOTS ECONOMICS'
transaction['metadata_id'] = account.standard_metadata_id() transaction['metadata_id'] = account.standard_metadata_id()
transaction['phone_number'] = account.phone_number transaction['phone_number'] = account.phone_number
session.close() session.close()

View File

@ -1,64 +1,61 @@
# standard imports
import argparse import argparse
import logging import logging
import sys
import os import os
import sys
# external imports # external imports
import celery import celery
import confini
import redis
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from confini import Config
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore from crypto_dev_signer.keystore.dict import DictKeystore
# local imports # local imports
from import_task import ImportTask, MetadataTask
from import_util import BalanceProcessor, get_celery_worker_status from import_util import BalanceProcessor, get_celery_worker_status
from import_task import ImportTask, MetadataTask
logging.basicConfig(level=logging.WARNING) default_config_dir = './config'
logg = logging.getLogger() logg = logging.getLogger()
config_dir = './config' arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('--include-balances', dest='include_balances', help='include opening balance transactions',
action='store_true')
arg_parser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
arg_parser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
arg_parser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
arg_parser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
arg_parser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1',
help='chain spec')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') if args.vv:
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') logging.getLogger().setLevel(logging.DEBUG)
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') elif args.v:
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str,
help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v:
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
elif args.vv: config = Config(args.c, args.env_prefix)
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process() config.process()
# override args
args_override = { args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'), 'ETH_PROVIDER': getattr(args, 'p'),
@ -73,88 +70,76 @@ args_override = {
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) logg.debug(f'config loaded from {args.c}:\n{config}')
redis_host = config.get('REDIS_HOST') db_config = {
redis_port = config.get('REDIS_PORT') 'database': config.get('DATABASE_NAME'),
redis_db = config.get('REDIS_DB') 'host': config.get('DATABASE_HOST'),
r = redis.Redis(redis_host, redis_port, redis_db) 'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
ImportTask.db_config = db_config
# create celery apps
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
status = get_celery_worker_status(celery_app=celery_app)
signer_address = None
keystore = DictKeystore() keystore = DictKeystore()
if args.y is not None: os.path.isfile(args.y)
logg.debug('loading keystore file {}'.format(args.y)) logg.debug(f'loading keystore file {args.y}')
signer_address = keystore.import_keystore_file(args.y) signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address)) logg.debug(f'now have key for signer address {signer_address}')
# define signer
signer = EIP155Signer(keystore) signer = EIP155Signer(keystore)
queue = args.q block_offset = -1 if args.head else args.offset
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
chain_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
ImportTask.chain_spec = chain_spec
old_chain_spec_str = args.old_chain_spec old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str) old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST') MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT') MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec
txs_dir = os.path.join(args.import_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
sys.stdout.write(f'created txs dir: {txs_dir}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def main(): def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER')) conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn,
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), chain_spec,
signer_address, signer) config.get('CIC_REGISTRY_ADDRESS'),
ImportTask.balance_processor.init(token_symbol) signer_address,
signer)
# TODO get decimals from token ImportTask.balance_processor.init(args.token_symbol)
balances = {} balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r')) accuracy = 10 ** 6
remove_zeros = 10 ** 6 count = 0
i = 0 with open(f'{args.import_dir}/balances.csv', 'r') as balances_file:
while True: while True:
l = f.readline() line = balances_file.readline()
if l is None: if line is None:
break break
r = l.split(',') balance_data = line.split(',')
try: try:
address = to_checksum_address(r[0]) blockchain_address = to_checksum_address(balance_data[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r") logg.info(
except ValueError: 'loading balance: {} {} {}'.format(count, blockchain_address, balance_data[1].ljust(200) + "\r"))
break except ValueError:
balance = int(int(r[1].rstrip()) / remove_zeros) break
balances[address] = balance balance = int(int(balance_data[1].rstrip()) / accuracy)
i += 1 balances[blockchain_address] = balance
count += 1
f.close()
ImportTask.balances = balances ImportTask.balances = balances
ImportTask.count = i ImportTask.count = count
ImportTask.import_dir = user_dir ImportTask.include_balances = args.include_balances is True
ImportTask.import_dir = args.import_dir
s = celery.signature( s_send_txs = celery.signature(
'import_task.send_txs', 'import_task.send_txs', [ImportTask.balance_processor.nonce_offset], queue=args.q)
[ s_send_txs.apply_async()
MetadataTask.balance_processor.nonce_offset,
],
queue=queue,
)
s.apply_async()
argv = ['worker'] argv = ['worker']
if args.vv: if args.vv:
@ -165,6 +150,7 @@ def main():
argv.append(args.q) argv.append(args.q)
argv.append('-n') argv.append('-n')
argv.append(args.q) argv.append(args.q)
argv.append(f'--pidfile={args.q}.pid')
celery_app.worker_main(argv) celery_app.worker_main(argv)

View File

@ -1,71 +1,63 @@
# standard import # standard imports
import argparse import argparse
import csv import csv
import logging import logging
import os import os
import psycopg2
# third-party imports # external imports
import celery from confini import Config
import confini
# local imports # local imports
from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_config_dir = './config' arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser.add_argument('--env-prefix', arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'), default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix', dest='env_prefix',
type=str, type=str,
help='environment prefix for variables to overwrite configuration') help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to') arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true') arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true') arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args() args = arg_parser.parse_args()
# set log levels if args.vv:
if args.v: logging.getLogger().setLevel(logging.DEBUG)
logg.setLevel(logging.INFO) elif args.v:
elif args.vv: logging.getLogger().setLevel(logging.INFO)
logg.setLevel(logging.DEBUG)
# process configs config = Config(args.c, args.env_prefix)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process() config.process()
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) logg.debug(f'config loaded from {args.c}:\n{config}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
status = get_celery_worker_status(celery_app=celery_app)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def main(): def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file: with open(f'{args.import_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)] phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature( db_conn = psycopg2.connect(
'import_task.set_pins', database=config.get('DATABASE_NAME'),
(db_configs, phone_to_pins), host=config.get('DATABASE_HOST'),
queue=args.q port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
) )
result = s_import_pins.apply_async() db_cursor = db_conn.cursor()
logg.debug(f'TASK: {result.id}, STATUS: {result.status}') sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
for element in phone_to_pins:
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating account: {element[0]} with: {element[1]}')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,38 +1,37 @@
# standard imports # standard imports
import csv
import json import json
import logging import logging
import os import os
import random import random
import urllib.error import uuid
import urllib.parse from urllib import error, parse, request
import urllib.request
# external imports # external imports
import celery import celery
import psycopg2 import psycopg2
from celery import Task
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import ( from chainlib.eth.tx import raw, unpack
unpack, from cic_types.models.person import Person, generate_metadata_pointer
raw, from hexathon import add_0x, strip_0x
)
from cic_types.models.person import Person # local imports
from cic_types.processor import generate_metadata_pointer
from hexathon import (
strip_0x,
add_0x,
)
logg = logging.getLogger()
celery_app = celery.current_app celery_app = celery.current_app
logg = logging.getLogger()
class ImportTask(celery.Task): class ImportTask(Task):
balances = None balances = None
import_dir = 'out'
count = 0
chain_spec = None
balance_processor = None balance_processor = None
chain_spec: ChainSpec = None
count = 0
db_config: dict = None
import_dir = ''
include_balances = False
max_retries = None max_retries = None
@ -41,121 +40,70 @@ class MetadataTask(ImportTask):
meta_port = None meta_port = None
meta_path = '' meta_path = ''
meta_ssl = False meta_ssl = False
autoretry_for = ( autoretry_for = (error.HTTPError, OSError,)
urllib.error.HTTPError,
OSError,
)
retry_jitter = True retry_jitter = True
retry_backoff = True retry_backoff = True
retry_backoff_max = 60 retry_backoff_max = 60
@classmethod @classmethod
def meta_url(self): def meta_url(cls):
scheme = 'http' scheme = 'http'
if self.meta_ssl: if cls.meta_ssl:
scheme += 's' scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path)) url = parse.urlparse(f'{scheme}://{cls.meta_host}:{cls.meta_port}/{cls.meta_path}')
return urllib.parse.urlunparse(url) return parse.urlunparse(url)
def old_address_from_phone(base_path, phone): def old_address_from_phone(base_path: str, phone_number: str):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') pid_x = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format( phone_idx_path = os.path.join(f'{base_path}/phone/{pid_x[:2]}/{pid_x[2:4]}/{pid_x}')
base_path, with open(phone_idx_path, 'r') as f:
pidx[:2], old_address = f.read()
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
return old_address return old_address
@celery_app.task(bind=True, base=MetadataTask) @celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone): def generate_person_metadata(self, blockchain_address: str, phone_number: str):
identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') logg.debug(f'blockchain address: {blockchain_address}')
url = urllib.parse.urljoin(self.meta_url(), identifier) old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone)) old_address_upper = strip_0x(old_blockchain_address).upper()
r = urllib.request.urlopen(url) metadata_path = f'{self.import_dir}/old/{old_address_upper[:2]}/{old_address_upper[2:4]}/{old_address_upper}.json'
address = json.load(r) with open(metadata_path, 'r') as metadata_file:
address = address.replace('"', '') person_metadata = json.load(metadata_file)
logg.debug('address {} for phone {}'.format(address, phone)) person = Person.deserialize(person_metadata)
if not person.identities.get('evm'):
return address person.identities['evm'] = {}
sub_chain_str = f'{self.chain_spec.common_name()}:{self.chain_spec.network_id()}'
person.identities['evm'][sub_chain_str] = [add_0x(blockchain_address)]
@celery_app.task(bind=True, base=MetadataTask) blockchain_address = strip_0x(blockchain_address)
def generate_metadata(self, address, phone): file_path = os.path.join(
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
self.import_dir, self.import_dir,
'new', 'new',
new_address_clean[:2].upper(), blockchain_address[:2].upper(),
new_address_clean[2:4].upper(), blockchain_address[2:4].upper(),
new_address_clean.upper() + '.json', blockchain_address.upper() + '.json'
) )
os.makedirs(os.path.dirname(filepath), exist_ok=True) os.makedirs(os.path.dirname(file_path), exist_ok=True)
serialized_person_metadata = person.serialize()
o = u.serialize() with open(file_path, 'w') as metadata_file:
f = open(filepath, 'w') metadata_file.write(json.dumps(serialized_person_metadata))
f.write(json.dumps(o)) logg.debug(f'written person metadata for address: {blockchain_address}')
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_filepath = os.path.join( meta_filepath = os.path.join(
self.import_dir, self.import_dir,
'meta', 'meta',
'{}.json'.format(new_address_clean.upper()), '{}.json'.format(blockchain_address.upper()),
) )
os.symlink(os.path.realpath(filepath), meta_filepath) os.symlink(os.path.realpath(file_path), meta_filepath)
return blockchain_address
# write ussd data
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
ussd_data_dir = os.path.join(self.import_dir, 'ussd')
ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json')
f = open(ussd_data_file_path, 'w')
f.write(json.dumps(ussd_data))
f.close()
# write preferences data @celery_app.task(bind=True, base=MetadataTask)
def generate_preferences_data(self, data: tuple):
blockchain_address: str = data[0]
preferences = data[1]
preferences_dir = os.path.join(self.import_dir, 'preferences') preferences_dir = os.path.join(self.import_dir, 'preferences')
preferences_data = { preferences_key = generate_metadata_pointer(bytes.fromhex(strip_0x(blockchain_address)), ':cic.preferences')
'preferred_language': ussd_data['preferred_language']
}
preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences')
preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key) preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key)
filepath = os.path.join( filepath = os.path.join(
preferences_dir, preferences_dir,
'new', 'new',
@ -164,95 +112,95 @@ def generate_metadata(self, address, phone):
preferences_key.upper() + '.json' preferences_key.upper() + '.json'
) )
os.makedirs(os.path.dirname(filepath), exist_ok=True) os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w') as preferences_file:
f = open(filepath, 'w') preferences_file.write(json.dumps(preferences))
f.write(json.dumps(preferences_data)) logg.debug(f'written preferences metadata: {preferences} for address: {blockchain_address}')
f.close()
os.symlink(os.path.realpath(filepath), preferences_filepath) os.symlink(os.path.realpath(filepath), preferences_filepath)
return blockchain_address
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
@celery_app.task(bind=True, base=MetadataTask) @celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial): def generate_pins_data(self, blockchain_address: str, phone_number: str):
old_address = old_address_from_phone(self.import_dir, phone) pins_file = f'{self.import_dir}/pins.csv'
file_op = 'a' if os.path.exists(pins_file) else 'w'
with open(pins_file, file_op) as pins_file:
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone_number},{password_hash}\n')
logg.debug(f'written pin data for address: {blockchain_address}')
return blockchain_address
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
@celery_app.task(bind=True, base=MetadataTask)
def generate_ussd_data(self, blockchain_address: str, phone_number: str):
ussd_data_file = f'{self.import_dir}/ussd_data.csv'
file_op = 'a' if os.path.exists(ussd_data_file) else 'w'
preferred_language = random.sample(["en", "sw"], 1)[0]
preferences = {'preferred_language': preferred_language}
with open(ussd_data_file, file_op) as ussd_data_file:
ussd_data_file.write(f'{phone_number}, { 1}, {preferred_language}, {False}\n')
logg.debug(f'written ussd data for address: {blockchain_address}')
return blockchain_address, preferences
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, blockchain_address: str, phone_number: str, serial: str):
old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
address = to_checksum_address(strip_0x(old_blockchain_address))
balance = self.balances[address]
logg.debug(f'found balance: {balance} for address: {address} phone: {phone_number}')
decimal_balance = self.balance_processor.get_decimal_amount(int(balance)) decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
tx_hash_hex, o = self.balance_processor.get_rpc_tx(blockchain_address, decimal_balance, serial)
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec) tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) logg.debug(f'generated tx token value: {decimal_balance}: {blockchain_address} tx hash {tx_hash_hex}')
tx_path = os.path.join(self.import_dir, 'txs', strip_0x(tx_hash_hex))
tx_path = os.path.join( with open(tx_path, 'w') as tx_file:
self.import_dir, tx_file.write(strip_0x(o))
'txs', logg.debug(f'written tx with tx hash: {tx["hash"]} for address: {blockchain_address}')
strip_0x(tx_hash_hex), tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(tx['nonce']))
)
f = open(tx_path, 'w')
f.write(strip_0x(o))
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
os.symlink(os.path.realpath(tx_path), tx_nonce_path) os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash'] return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, @celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone_number: str):
identifier = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
url = parse.urljoin(self.meta_url(), identifier)
logg.debug(f'attempt getting phone pointer at: {url} for phone: {phone_number}')
r = request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug(f'address: {address} for phone: {phone_number}')
return address
@celery_app.task(autoretry_for=(FileNotFoundError,),
bind=True,
base=ImportTask,
max_retries=None,
default_retry_delay=0.1) default_retry_delay=0.1)
def send_txs(self, nonce): def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset,
self.count))
return
logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key') queue = self.request.delivery_info.get('routing_key')
s = celery.signature( if nonce == self.count + self.balance_processor.nonce_offset:
'import_task.send_txs', logg.info(f'reached nonce {nonce} (offset {self.balance_processor.nonce_offset} + count {self.count}).')
[ celery_app.control.broadcast('shutdown', destination=[f'celery@{queue}'])
nonce,
],
queue=queue,
)
s.apply_async()
logg.debug(f'attempt to open symlink for nonce {nonce}')
tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(nonce))
with open(tx_nonce_path, 'r') as tx_nonce_file:
tx_signed_raw_hex = tx_nonce_file.read()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
if self.include_balances:
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info(f'sent nonce {nonce} tx hash {tx_hash_hex}')
nonce += 1
s = celery.signature('import_task.send_txs', [nonce], queue=queue)
s.apply_async()
return nonce return nonce
@celery_app.task @celery_app.task()
def set_pins(config: dict, phone_to_pins: list): def set_pin_data(config: dict, phone_to_pins: list):
# define db connection
db_conn = psycopg2.connect( db_conn = psycopg2.connect(
database=config.get('database'), database=config.get('database'),
host=config.get('host'), host=config.get('host'),
@ -261,24 +209,17 @@ def set_pins(config: dict, phone_to_pins: list):
password=config.get('password') password=config.get('password')
) )
db_cursor = db_conn.cursor() db_cursor = db_conn.cursor()
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
# update db
for element in phone_to_pins: for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0])) db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}') logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit() db_conn.commit()
# close connections
db_cursor.close() db_cursor.close()
db_conn.close() db_conn.close()
@celery_app.task @celery_app.task
def set_ussd_data(config: dict, ussd_data: dict): def set_ussd_data(config: dict, ussd_data: list):
# define db connection
db_conn = psycopg2.connect( db_conn = psycopg2.connect(
database=config.get('database'), database=config.get('database'),
host=config.get('host'), host=config.get('host'),
@ -287,20 +228,12 @@ def set_ussd_data(config: dict, ussd_data: dict):
password=config.get('password') password=config.get('password')
) )
db_cursor = db_conn.cursor() db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s' sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number)) for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
# commit changes preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
db_conn.commit() db_conn.commit()
# close connections
db_cursor.close() db_cursor.close()
db_conn.close() db_conn.close()

View File

@ -3,56 +3,61 @@ import argparse
import json import json
import logging import logging
import os import os
import redis
import sys import sys
import time import time
import urllib.request
import uuid import uuid
from urllib import request
from urllib.parse import urlencode from urllib.parse import urlencode
# external imports # external imports
import celery import celery
import confini
import phonenumbers import phonenumbers
import redis
from chainlib.chain import ChainSpec
from cic_types.models.person import Person from cic_types.models.person import Person
from confini import Config
# local imports # local imports
from import_util import get_celery_worker_status from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic' arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
# batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
arg_parser.add_argument('--batch-size',
dest='batch_size',
default=100,
type=int,
help='burst size of sending transactions to node')
arg_parser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
arg_parser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser() if args.vv:
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') logging.getLogger().setLevel(logging.DEBUG)
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string') elif args.v:
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') logging.getLogger().setLevel(logging.INFO)
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int,
help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = argparser.parse_args()
if args.v: config = Config(args.c, args.env_prefix)
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process() config.process()
args_override = { args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
@ -60,44 +65,29 @@ args_override = {
'REDIS_PORT': getattr(args, 'redis_port'), 'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'), 'REDIS_DB': getattr(args, 'redis_db'),
} }
config.dict_override(args_override, 'cli') config.dict_override(args_override, 'cli flag')
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug(f'config loaded from {args.c}:\n{config}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) old_account_dir = os.path.join(args.import_dir, 'old')
get_celery_worker_status(celery_app=celery_app) os.stat(old_account_dir)
logg.debug(f'created old system data dir: {old_account_dir}')
redis_host = config.get('REDIS_HOST') new_account_dir = os.path.join(args.import_dir, 'new')
redis_port = config.get('REDIS_PORT') os.makedirs(new_account_dir, exist_ok=True)
redis_db = config.get('REDIS_DB') logg.debug(f'created new system data dir: {new_account_dir}')
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub() person_metadata_dir = os.path.join(args.import_dir, 'meta')
os.makedirs(person_metadata_dir, exist_ok=True)
logg.debug(f'created person metadata dir: {person_metadata_dir}')
user_new_dir = os.path.join(args.user_dir, 'new') preferences_dir = os.path.join(args.import_dir, 'preferences')
os.makedirs(user_new_dir, exist_ok=True)
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
os.makedirs(ussd_data_dir, exist_ok=True)
preferences_dir = os.path.join(args.user_dir, 'preferences')
os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True) os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True)
logg.debug(f'created preferences metadata dir: {preferences_dir}')
meta_dir = os.path.join(args.user_dir, 'meta') valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
os.makedirs(meta_dir, exist_ok=True)
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
ussd_port = args.ussd_port
ussd_host = args.ussd_host
ussd_no_ssl = args.ussd_no_ssl ussd_no_ssl = args.ussd_no_ssl
if ussd_no_ssl is True: if ussd_no_ssl is True:
ussd_ssl = False ussd_ssl = False
@ -105,7 +95,17 @@ else:
ussd_ssl = True ussd_ssl = True
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def build_ussd_request(host: str,
password: str,
phone_number: str,
port: str,
service_code: str,
username: str,
ssl: bool = False):
url = 'http' url = 'http'
if ssl: if ssl:
url += 's' url += 's'
@ -115,16 +115,16 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
url += '/?username={}&password={}'.format(username, password) url += '/?username={}&password={}'.format(username, password)
logg.info('ussd service url {}'.format(url)) logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone)) logg.info('ussd phone {}'.format(phone_number))
session = uuid.uuid4().hex session = uuid.uuid4().hex
data = { data = {
'sessionId': session, 'sessionId': session,
'serviceCode': service_code, 'serviceCode': service_code,
'phoneNumber': phone, 'phoneNumber': phone_number,
'text': service_code, 'text': service_code,
} }
req = urllib.request.Request(url) req = request.Request(url)
req.method = 'POST' req.method = 'POST'
data_str = urlencode(data) data_str = urlencode(data)
data_bytes = data_str.encode('utf-8') data_bytes = data_str.encode('utf-8')
@ -134,85 +134,77 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
return req return req
def register_ussd(i, u): def e164_phone_number(phone_number: str):
phone_object = phonenumbers.parse(u.tel) phone_object = phonenumbers.parse(phone_number)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('tel {} {}'.format(u.tel, phone))
req = build_ussd_request(
phone, def register_account(person: Person):
ussd_host, phone_number = e164_phone_number(person.tel)
ussd_port, logg.debug(f'tel: {phone_number}')
config.get('APP_SERVICE_CODE'), req = build_ussd_request(args.ussd_host,
'', '',
'', phone_number,
ussd_ssl args.ussd_port,
) valid_service_codes[0],
response = urllib.request.urlopen(req) '',
ussd_ssl)
response = request.urlopen(req)
response_data = response.read().decode('utf-8') response_data = response.read().decode('utf-8')
state = response_data[:3] logg.debug(f'ussd response: {response_data[4:]}')
out = response_data[4:]
logg.debug('ussd reponse: {}'.format(out))
if __name__ == '__main__': if __name__ == '__main__':
i = 0 i = 0
j = 0 j = 0
for x in os.walk(user_old_dir): for x in os.walk(old_account_dir):
for y in x[2]: for y in x[2]:
if y[len(y) - 5:] != '.json': if y[len(y) - 5:] != '.json':
continue continue
# handle json containing person object
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
register_ussd(i, u) file_path = os.path.join(x[0], y)
with open(file_path, 'r') as account_file:
phone_object = phonenumbers.parse(u.tel) try:
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) account_data = json.load(account_file)
except json.decoder.JSONDecodeError as e:
s_phone = celery.signature( logg.error('load error for {}: {}'.format(y, e))
'import_task.resolve_phone', continue
[ person = Person.deserialize(account_data)
phone, register_account(person)
], phone_number = e164_phone_number(person.tel)
queue='cic-import-ussd', s_resolve_phone = celery.signature(
'import_task.resolve_phone', [phone_number], queue=args.q
) )
s_meta = celery.signature( s_person_metadata = celery.signature(
'import_task.generate_metadata', 'import_task.generate_person_metadata', [phone_number], queue=args.q
[
phone,
],
queue='cic-import-ussd',
) )
s_balance = celery.signature( s_ussd_data = celery.signature(
'import_task.opening_balance_tx', 'import_task.generate_ussd_data', [phone_number], queue=args.q
[
phone,
i,
],
queue='cic-import-ussd',
) )
s_meta.link(s_balance) s_preferences_metadata = celery.signature(
s_phone.link(s_meta) 'import_task.generate_preferences_data', [], queue=args.q
# block time plus a bit of time for ussd processing )
s_phone.apply_async(countdown=7)
s_pins_data = celery.signature(
'import_task.generate_pins_data', [phone_number], queue=args.q
)
s_opening_balance = celery.signature(
'import_task.opening_balance_tx', [phone_number, i], queue=args.q
)
celery.chain(s_resolve_phone,
s_person_metadata,
s_ussd_data,
s_preferences_metadata,
s_pins_data,
s_opening_balance).apply_async(countdown=7)
i += 1 i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") sys.stdout.write('imported: {} {}'.format(i, person).ljust(200) + "\r\n")
j += 1 j += 1
if j == batch_size: if j == args.batch_size:
time.sleep(batch_delay) time.sleep(args.batch_delay)
j = 0 j = 0

View File

@ -1,67 +1,67 @@
# standard imports # standard imports
import argparse import argparse
import json import csv
import logging import logging
import os import os
import psycopg2
# external imports # external imports
import celery
from confini import Config from confini import Config
# local imports # local imports
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic' arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args() args = arg_parser.parse_args()
if args.v: if args.vv:
logg.setLevel(logging.INFO) logging.getLogger().setLevel(logging.DEBUG)
elif args.vv: elif args.v:
logg.setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.INFO)
config_dir = args.c config = Config(args.c, args.env_prefix)
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process() config.process()
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) config.censor('PASSWORD', 'DATABASE')
logg.debug(f'config loaded from {args.c}:\n{config}')
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
db_configs = { def main():
'database': config.get('DATABASE_NAME'), with open(f'{args.import_dir}/ussd_data.csv') as ussd_data_file:
'host': config.get('DATABASE_HOST'), ussd_data = [tuple(row) for row in csv.reader(ussd_data_file)]
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'), db_conn = psycopg2.connect(
'password': config.get('DATABASE_PASSWORD') database=config.get('DATABASE_NAME'),
} host=config.get('DATABASE_HOST'),
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
)
db_cursor = db_conn.cursor()
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
logg.debug(f'Updating account:{phone_number} with: preferred language: {preferred_language} status: {status}.')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__': if __name__ == '__main__':
for x in os.walk(ussd_data_dir): main()
for y in x[2]:
if y[len(y) - 5:] == '.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
logg.debug(f'LOADING USSD DATA: {ussd_data}')
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')

View File

@ -1,27 +1,4 @@
[app] [app]
ALLOWED_IP=0.0.0.0/0 allowed_ip=0.0.0.0/0
LOCALE_FALLBACK=en max_body_length=1024
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/ password_pepper=
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =
[keystore]
file_path = keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c

View File

@ -1,10 +1,10 @@
[database] [database]
NAME=sempo name=cic_ussd
USER=postgres user=postgres
PASSWORD= password=
HOST=localhost host=localhost
PORT=5432 port=5432
ENGINE=postgresql engine=postgresql
DRIVER=psycopg2 driver=psycopg2
DEBUG=0 debug=0
POOL_SIZE=1 pool_size=1

View File

@ -0,0 +1,5 @@
[ussd]
menu_file=data/ussd_menu.json
service_code=*483*46#,*483*061#,*384*96#
user =
pass =

View File

@ -1,91 +0,0 @@
# standard imports
import argparse
import json
import logging
import os
import uuid
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
echo "Creating seed data..."
python create_import_users.py -vv --dir "$IMPORT_DIR" "$ACCOUNT_COUNT"
wait $!
echo "Purge tasks from celery worker"
celery -A cic_ussd.import_task purge -Q "$CELERY_QUEUE" --broker redis://"$REDIS_HOST":"$REDIS_PORT" -f
echo "Start celery work and import balance job"
if [ "$INCLUDE_BALANCES" != "y" ]
then
echo "Running worker without opening balance transactions"
TARGET_TX_COUNT=$ACCOUNT_COUNT
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
else
echo "Running worker with opening balance transactions"
TARGET_TX_COUNT=$((ACCOUNT_COUNT*2))
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --include-balances --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
fi
until [ -f ./cic-import-ussd.pid ]
do
echo "Polling for celery worker pid file..."
done
IMPORT_BALANCE_JOB=$(<cic-import-ussd.pid)
echo "Start import users job"
if [ "$USSD_SSL" == "y" ]
then
echo "Targeting secure ussd-user server"
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" "$IMPORT_DIR"
else
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" --ussd-no-ssl "$IMPORT_DIR"
fi
echo "Waiting for import balance job to complete ..."
tail --pid="$IMPORT_BALANCE_JOB" -f /dev/null
set -e
echo "Importing pins"
python cic_ussd/import_pins.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
set -e
echo "Importing ussd data"
python cic_ussd/import_ussd_data.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
echo "Importing person metadata"
node cic_meta/import_meta.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
echo "Import preferences metadata"
node cic_meta/import_meta_preferences.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
CIC_NOTIFY_DATABASE=postgres://$DATABASE_USER:$DATABASE_PASSWORD@$DATABASE_HOST:$DATABASE_PORT/$NOTIFY_DATABASE_NAME
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
while [[ "$NOTIFICATION_COUNT" < "$TARGET_TX_COUNT" ]]
do
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
sleep 5
echo "Notification count is: ${NOTIFICATION_COUNT}. Checking after 5 ..."
done
python verify.py -c "$CONFIG" -v -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --exclude "$EXCLUSIONS" --token-symbol "$TOKEN_SYMBOL" "$IMPORT_DIR"

View File

@ -187,9 +187,10 @@ def send_ussd_request(address, data_dir):
phone = p.tel phone = p.tel
session = uuid.uuid4().hex session = uuid.uuid4().hex
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
data = { data = {
'sessionId': session, 'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'), 'serviceCode': valid_service_codes[0],
'phoneNumber': phone, 'phoneNumber': phone,
'text': '', 'text': '',
} }