From 92cc6a3f2786373bacb186b2dff4cc40342fd25e Mon Sep 17 00:00:00 2001 From: Philip Wafula Date: Sun, 29 Aug 2021 09:55:47 +0000 Subject: [PATCH] Consolidated ussd dataseeding script --- apps/cic-notify/cic_notify/tasks/sms/db.py | 6 +- apps/cic-notify/cic_notify/tasks/sms/log.py | 7 +- apps/cic-ussd/cic_ussd/account/transaction.py | 11 +- apps/cic-ussd/cic_ussd/metadata/base.py | 2 +- .../cic-ussd/cic_ussd/session/ussd_session.py | 2 +- .../cic_ussd/tasks/callback_handler.py | 14 +- apps/cic-ussd/cic_ussd/tasks/processor.py | 9 +- apps/data-seeding/cic_ussd/import_balance.py | 194 +++++----- apps/data-seeding/cic_ussd/import_pins.py | 72 ++-- apps/data-seeding/cic_ussd/import_task.py | 335 +++++++----------- apps/data-seeding/cic_ussd/import_users.py | 250 +++++++------ .../data-seeding/cic_ussd/import_ussd_data.py | 90 ++--- apps/data-seeding/config/app.ini | 29 +- apps/data-seeding/config/database.ini | 18 +- apps/data-seeding/config/ussd.ini | 5 + apps/data-seeding/create_import_pins.py | 91 ----- apps/data-seeding/import_ussd.sh | 55 +++ apps/data-seeding/verify.py | 3 +- 18 files changed, 520 insertions(+), 673 deletions(-) create mode 100644 apps/data-seeding/config/ussd.ini delete mode 100644 apps/data-seeding/create_import_pins.py create mode 100644 apps/data-seeding/import_ussd.sh diff --git a/apps/cic-notify/cic_notify/tasks/sms/db.py b/apps/cic-notify/cic_notify/tasks/sms/db.py index ca623154..e21b2b79 100644 --- a/apps/cic-notify/cic_notify/tasks/sms/db.py +++ b/apps/cic-notify/cic_notify/tasks/sms/db.py @@ -11,12 +11,12 @@ celery_app = celery.current_app @celery_app.task -def persist_notification(recipient, message): +def persist_notification(message, recipient): """ - :param recipient: - :type recipient: :param message: :type message: + :param recipient: + :type recipient: :return: :rtype: """ diff --git a/apps/cic-notify/cic_notify/tasks/sms/log.py b/apps/cic-notify/cic_notify/tasks/sms/log.py index 30de7741..05a6444a 100644 --- a/apps/cic-notify/cic_notify/tasks/sms/log.py +++ b/apps/cic-notify/cic_notify/tasks/sms/log.py @@ -11,12 +11,13 @@ local_logg = logging.getLogger(__name__) @celery_app.task -def log(recipient, message): +def log(message, recipient): """ - :param recipient: - :type recipient: + :param message: :type message: + :param recipient: + :type recipient: :return: :rtype: """ diff --git a/apps/cic-ussd/cic_ussd/account/transaction.py b/apps/cic-ussd/cic_ussd/account/transaction.py index 02b22270..980aae42 100644 --- a/apps/cic-ussd/cic_ussd/account/transaction.py +++ b/apps/cic-ussd/cic_ussd/account/transaction.py @@ -1,5 +1,6 @@ # standard import import decimal +import json import logging from typing import Dict, Tuple @@ -8,6 +9,8 @@ from cic_eth.api import Api from sqlalchemy.orm.session import Session # local import +from cic_ussd.account.chain import Chain +from cic_ussd.account.tokens import get_cached_default_token from cic_ussd.db.models.account import Account from cic_ussd.db.models.base import SessionBase from cic_ussd.error import UnknownUssdRecipient @@ -59,7 +62,9 @@ def from_wei(value: int) -> float: :return: SRF equivalent of value in Wei :rtype: float """ - value = float(value) / 1e+6 + cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__())) + token_decimals: int = cached_token_data.get('decimals') + value = float(value) / (10**token_decimals) return truncate(value=value, decimals=2) @@ -70,7 +75,9 @@ def to_wei(value: int) -> int: :return: Wei equivalent of value in SRF :rtype: int """ - return int(value * 1e+6) + cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__())) + token_decimals: int = cached_token_data.get('decimals') + return int(value * (10**token_decimals)) def truncate(value: float, decimals: int): diff --git a/apps/cic-ussd/cic_ussd/metadata/base.py b/apps/cic-ussd/cic_ussd/metadata/base.py index f184adf6..c422ee7a 100644 --- a/apps/cic-ussd/cic_ussd/metadata/base.py +++ b/apps/cic-ussd/cic_ussd/metadata/base.py @@ -44,7 +44,7 @@ class MetadataRequestsHandler(Metadata): def create(self, data: Union[Dict, str]): """""" - data = json.dumps(data) + data = json.dumps(data).encode('utf-8') result = make_request(method='POST', url=self.url, data=data, headers=self.headers) error_handler(result=result) diff --git a/apps/cic-ussd/cic_ussd/session/ussd_session.py b/apps/cic-ussd/cic_ussd/session/ussd_session.py index c5ec9fbb..9df54afa 100644 --- a/apps/cic-ussd/cic_ussd/session/ussd_session.py +++ b/apps/cic-ussd/cic_ussd/session/ussd_session.py @@ -146,7 +146,7 @@ def create_ussd_session( ) -def update_ussd_session(ussd_session: UssdSession, +def update_ussd_session(ussd_session: DbUssdSession, user_input: str, state: str, data: Optional[dict] = None) -> UssdSession: diff --git a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py index c8350996..f3a80caf 100644 --- a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py +++ b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py @@ -138,26 +138,14 @@ def transaction_balances_callback(self, result: list, param: dict, status_code: balances_data = result[0] available_balance = calculate_available_balance(balances_data) transaction = param - blockchain_address = transaction.get('blockchain_address') transaction['available_balance'] = available_balance queue = self.request.delivery_info.get('routing_key') - s_preferences_metadata = celery.signature( - 'cic_ussd.tasks.metadata.query_preferences_metadata', [blockchain_address], queue=queue - ) s_process_account_metadata = celery.signature( 'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue ) s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue) - - if transaction.get('transaction_type') == 'transfer': - celery.chain(s_preferences_metadata, s_process_account_metadata, s_notify_account).apply_async() - - if transaction.get('transaction_type') == 'tokengift': - s_process_account_metadata = celery.signature( - 'cic_ussd.tasks.processor.parse_transaction', [{}, transaction], queue=queue - ) - celery.chain(s_process_account_metadata, s_notify_account).apply_async() + celery.chain(s_process_account_metadata, s_notify_account).apply_async() @celery_app.task diff --git a/apps/cic-ussd/cic_ussd/tasks/processor.py b/apps/cic-ussd/cic_ussd/tasks/processor.py index 287dbc58..8a188f54 100644 --- a/apps/cic-ussd/cic_ussd/tasks/processor.py +++ b/apps/cic-ussd/cic_ussd/tasks/processor.py @@ -8,6 +8,7 @@ import i18n from chainlib.hash import strip_0x # local imports +from cic_ussd.account.metadata import get_cached_preferred_language from cic_ussd.account.statement import get_cached_statement from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account from cic_ussd.cache import cache_data, cache_data_key @@ -58,19 +59,17 @@ def cache_statement(parsed_transaction: dict, querying_party: str): @celery_app.task -def parse_transaction(preferences: dict, transaction: dict) -> dict: +def parse_transaction(transaction: dict) -> dict: """This function parses transaction objects and collates all relevant data for system use i.e: - An account's set preferred language. - Account identifier that facilitates notification. - Contextual tags i.e action and direction tags. - :param preferences: An account's set preferences. - :type preferences: dict :param transaction: Transaction object. :type transaction: dict :return: Transaction object with contextual data for use in the system. :rtype: dict """ - preferred_language = preferences.get('preferred_language') + preferred_language = get_cached_preferred_language(transaction.get('blockchain_address')) if not preferred_language: preferred_language = i18n.config.get('fallback') transaction['preferred_language'] = preferred_language @@ -83,6 +82,8 @@ def parse_transaction(preferences: dict, transaction: dict) -> dict: alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first() if alt_account: transaction['alt_metadata_id'] = alt_account.standard_metadata_id() + else: + transaction['alt_metadata_id'] = 'GRASSROOTS ECONOMICS' transaction['metadata_id'] = account.standard_metadata_id() transaction['phone_number'] = account.phone_number session.close() diff --git a/apps/data-seeding/cic_ussd/import_balance.py b/apps/data-seeding/cic_ussd/import_balance.py index df5bdfe4..9f974e70 100644 --- a/apps/data-seeding/cic_ussd/import_balance.py +++ b/apps/data-seeding/cic_ussd/import_balance.py @@ -1,64 +1,61 @@ -# standard imports import argparse import logging -import sys import os +import sys # external imports import celery -import confini -import redis from chainlib.chain import ChainSpec from chainlib.eth.address import to_checksum_address from chainlib.eth.connection import EthHTTPConnection +from confini import Config from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore.dict import DictKeystore # local imports -from import_task import ImportTask, MetadataTask from import_util import BalanceProcessor, get_celery_worker_status +from import_task import ImportTask, MetadataTask -logging.basicConfig(level=logging.WARNING) +default_config_dir = './config' logg = logging.getLogger() -config_dir = './config' +arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.') +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.') +arg_parser.add_argument('--env-prefix', + default=os.environ.get('CONFINI_ENV_PREFIX'), + dest='env_prefix', + type=str, + help='environment prefix for variables to overwrite configuration.') +arg_parser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)') +arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') +arg_parser.add_argument('--include-balances', dest='include_balances', help='include opening balance transactions', + action='store_true') +arg_parser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host') +arg_parser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host') +arg_parser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') +arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.') +arg_parser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') +arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') +arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') +arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') +arg_parser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol', + help='Token symbol to use for transactions') +arg_parser.add_argument('-v', help='be verbose', action='store_true') +arg_parser.add_argument('-vv', help='be more verbose', action='store_true') +arg_parser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') +arg_parser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from') +arg_parser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', + help='chain spec') +arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory') +args = arg_parser.parse_args() -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') -argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec') -argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') -argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') -argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host') -argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host') -argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') -argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') -argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') -argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol', - help='Token symbol to use for transactions') -argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, - help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to') -argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -argparser.add_argument('user_dir', default='out', type=str, help='user export directory') -args = argparser.parse_args(sys.argv[1:]) - -if args.v: +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: logging.getLogger().setLevel(logging.INFO) -elif args.vv: - logging.getLogger().setLevel(logging.DEBUG) - -config_dir = os.path.join(args.c) -os.makedirs(config_dir, 0o777, True) -config = confini.Config(config_dir, args.env_prefix) +config = Config(args.c, args.env_prefix) config.process() - -# override args args_override = { 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'ETH_PROVIDER': getattr(args, 'p'), @@ -73,88 +70,76 @@ args_override = { config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +logg.debug(f'config loaded from {args.c}:\n{config}') -redis_host = config.get('REDIS_HOST') -redis_port = config.get('REDIS_PORT') -redis_db = config.get('REDIS_DB') -r = redis.Redis(redis_host, redis_port, redis_db) +db_config = { + 'database': config.get('DATABASE_NAME'), + 'host': config.get('DATABASE_HOST'), + 'port': config.get('DATABASE_PORT'), + 'user': config.get('DATABASE_USER'), + 'password': config.get('DATABASE_PASSWORD') +} +ImportTask.db_config = db_config -# create celery apps -celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) -status = get_celery_worker_status(celery_app=celery_app) - -signer_address = None keystore = DictKeystore() -if args.y is not None: - logg.debug('loading keystore file {}'.format(args.y)) - signer_address = keystore.import_keystore_file(args.y) - logg.debug('now have key for signer address {}'.format(signer_address)) - -# define signer +os.path.isfile(args.y) +logg.debug(f'loading keystore file {args.y}') +signer_address = keystore.import_keystore_file(args.y) +logg.debug(f'now have key for signer address {signer_address}') signer = EIP155Signer(keystore) -queue = args.q -chain_str = config.get('CIC_CHAIN_SPEC') -block_offset = 0 -if args.head: - block_offset = -1 -else: - block_offset = args.offset +block_offset = -1 if args.head else args.offset +chain_str = config.get('CIC_CHAIN_SPEC') chain_spec = ChainSpec.from_chain_str(chain_str) +ImportTask.chain_spec = chain_spec old_chain_spec_str = args.old_chain_spec old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str) -user_dir = args.user_dir # user_out_dir from import_users.py - -token_symbol = args.token_symbol - MetadataTask.meta_host = config.get('META_HOST') MetadataTask.meta_port = config.get('META_PORT') -ImportTask.chain_spec = chain_spec + +txs_dir = os.path.join(args.import_dir, 'txs') +os.makedirs(txs_dir, exist_ok=True) +sys.stdout.write(f'created txs dir: {txs_dir}') + +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) +get_celery_worker_status(celery_app) def main(): conn = EthHTTPConnection(config.get('ETH_PROVIDER')) - - ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), - signer_address, signer) - ImportTask.balance_processor.init(token_symbol) - - # TODO get decimals from token + ImportTask.balance_processor = BalanceProcessor(conn, + chain_spec, + config.get('CIC_REGISTRY_ADDRESS'), + signer_address, + signer) + ImportTask.balance_processor.init(args.token_symbol) balances = {} - f = open('{}/balances.csv'.format(user_dir, 'r')) - remove_zeros = 10 ** 6 - i = 0 - while True: - l = f.readline() - if l is None: - break - r = l.split(',') - try: - address = to_checksum_address(r[0]) - sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r") - except ValueError: - break - balance = int(int(r[1].rstrip()) / remove_zeros) - balances[address] = balance - i += 1 - - f.close() - + accuracy = 10 ** 6 + count = 0 + with open(f'{args.import_dir}/balances.csv', 'r') as balances_file: + while True: + line = balances_file.readline() + if line is None: + break + balance_data = line.split(',') + try: + blockchain_address = to_checksum_address(balance_data[0]) + logg.info( + 'loading balance: {} {} {}'.format(count, blockchain_address, balance_data[1].ljust(200) + "\r")) + except ValueError: + break + balance = int(int(balance_data[1].rstrip()) / accuracy) + balances[blockchain_address] = balance + count += 1 ImportTask.balances = balances - ImportTask.count = i - ImportTask.import_dir = user_dir - - s = celery.signature( - 'import_task.send_txs', - [ - MetadataTask.balance_processor.nonce_offset, - ], - queue=queue, - ) - s.apply_async() + ImportTask.count = count + ImportTask.include_balances = args.include_balances is True + ImportTask.import_dir = args.import_dir + s_send_txs = celery.signature( + 'import_task.send_txs', [ImportTask.balance_processor.nonce_offset], queue=args.q) + s_send_txs.apply_async() argv = ['worker'] if args.vv: @@ -165,6 +150,7 @@ def main(): argv.append(args.q) argv.append('-n') argv.append(args.q) + argv.append(f'--pidfile={args.q}.pid') celery_app.worker_main(argv) diff --git a/apps/data-seeding/cic_ussd/import_pins.py b/apps/data-seeding/cic_ussd/import_pins.py index 827d3840..047faf0a 100644 --- a/apps/data-seeding/cic_ussd/import_pins.py +++ b/apps/data-seeding/cic_ussd/import_pins.py @@ -1,71 +1,63 @@ -# standard import +# standard imports import argparse import csv import logging import os +import psycopg2 -# third-party imports -import celery -import confini +# external imports +from confini import Config # local imports -from import_util import get_celery_worker_status + +default_config_dir = './config' logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -default_config_dir = './config' - -arg_parser = argparse.ArgumentParser() -arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use') +arg_parser = argparse.ArgumentParser(description='Pins import script.') +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.') arg_parser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, - help='environment prefix for variables to overwrite configuration') -arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to') + help='environment prefix for variables to overwrite configuration.') +arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory') arg_parser.add_argument('-v', help='be verbose', action='store_true') arg_parser.add_argument('-vv', help='be more verbose', action='store_true') -arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory') + args = arg_parser.parse_args() -# set log levels -if args.v: - logg.setLevel(logging.INFO) -elif args.vv: - logg.setLevel(logging.DEBUG) +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: + logging.getLogger().setLevel(logging.INFO) -# process configs -config_dir = args.c -config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config = Config(args.c, args.env_prefix) config.process() config.censor('PASSWORD', 'DATABASE') -logg.debug('config loaded from {}:\n{}'.format(args.c, config)) - -celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -status = get_celery_worker_status(celery_app=celery_app) - - -db_configs = { - 'database': config.get('DATABASE_NAME'), - 'host': config.get('DATABASE_HOST'), - 'port': config.get('DATABASE_PORT'), - 'user': config.get('DATABASE_USER'), - 'password': config.get('DATABASE_PASSWORD') -} +logg.debug(f'config loaded from {args.c}:\n{config}') def main(): - with open(f'{args.pins_dir}/pins.csv') as pins_file: + with open(f'{args.import_dir}/pins.csv') as pins_file: phone_to_pins = [tuple(row) for row in csv.reader(pins_file)] - s_import_pins = celery.signature( - 'import_task.set_pins', - (db_configs, phone_to_pins), - queue=args.q + db_conn = psycopg2.connect( + database=config.get('DATABASE_NAME'), + host=config.get('DATABASE_HOST'), + port=config.get('DATABASE_PORT'), + user=config.get('DATABASE_USER'), + password=config.get('DATABASE_PASSWORD') ) - result = s_import_pins.apply_async() - logg.debug(f'TASK: {result.id}, STATUS: {result.status}') + db_cursor = db_conn.cursor() + sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s' + for element in phone_to_pins: + db_cursor.execute(sql, (element[1], element[0])) + logg.debug(f'Updating account: {element[0]} with: {element[1]}') + db_conn.commit() + db_cursor.close() + db_conn.close() if __name__ == '__main__': diff --git a/apps/data-seeding/cic_ussd/import_task.py b/apps/data-seeding/cic_ussd/import_task.py index d04f6dea..00e72f1a 100644 --- a/apps/data-seeding/cic_ussd/import_task.py +++ b/apps/data-seeding/cic_ussd/import_task.py @@ -1,38 +1,37 @@ # standard imports +import csv import json import logging import os import random -import urllib.error -import urllib.parse -import urllib.request +import uuid +from urllib import error, parse, request # external imports import celery import psycopg2 +from celery import Task +from chainlib.chain import ChainSpec from chainlib.eth.address import to_checksum_address -from chainlib.eth.tx import ( - unpack, - raw, -) -from cic_types.models.person import Person -from cic_types.processor import generate_metadata_pointer -from hexathon import ( - strip_0x, - add_0x, -) +from chainlib.eth.tx import raw, unpack +from cic_types.models.person import Person, generate_metadata_pointer +from hexathon import add_0x, strip_0x + +# local imports -logg = logging.getLogger() celery_app = celery.current_app +logg = logging.getLogger() -class ImportTask(celery.Task): +class ImportTask(Task): balances = None - import_dir = 'out' - count = 0 - chain_spec = None balance_processor = None + chain_spec: ChainSpec = None + count = 0 + db_config: dict = None + import_dir = '' + include_balances = False max_retries = None @@ -41,121 +40,70 @@ class MetadataTask(ImportTask): meta_port = None meta_path = '' meta_ssl = False - autoretry_for = ( - urllib.error.HTTPError, - OSError, - ) + autoretry_for = (error.HTTPError, OSError,) retry_jitter = True retry_backoff = True retry_backoff_max = 60 @classmethod - def meta_url(self): + def meta_url(cls): scheme = 'http' - if self.meta_ssl: + if cls.meta_ssl: scheme += 's' - url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path)) - return urllib.parse.urlunparse(url) + url = parse.urlparse(f'{scheme}://{cls.meta_host}:{cls.meta_port}/{cls.meta_path}') + return parse.urlunparse(url) -def old_address_from_phone(base_path, phone): - pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') - phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format( - base_path, - pidx[:2], - pidx[2:4], - pidx, - ) - ) - f = open(phone_idx_path, 'r') - old_address = f.read() - f.close() - +def old_address_from_phone(base_path: str, phone_number: str): + pid_x = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone') + phone_idx_path = os.path.join(f'{base_path}/phone/{pid_x[:2]}/{pid_x[2:4]}/{pid_x}') + with open(phone_idx_path, 'r') as f: + old_address = f.read() return old_address @celery_app.task(bind=True, base=MetadataTask) -def resolve_phone(self, phone): - identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') - url = urllib.parse.urljoin(self.meta_url(), identifier) - logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone)) - r = urllib.request.urlopen(url) - address = json.load(r) - address = address.replace('"', '') - logg.debug('address {} for phone {}'.format(address, phone)) - - return address - - -@celery_app.task(bind=True, base=MetadataTask) -def generate_metadata(self, address, phone): - old_address = old_address_from_phone(self.import_dir, phone) - - logg.debug('address {}'.format(address)) - old_address_upper = strip_0x(old_address).upper() - metadata_path = '{}/old/{}/{}/{}.json'.format( - self.import_dir, - old_address_upper[:2], - old_address_upper[2:4], - old_address_upper, - ) - - f = open(metadata_path, 'r') - o = json.load(f) - f.close() - - u = Person.deserialize(o) - - if u.identities.get('evm') == None: - u.identities['evm'] = {} - sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id()) - u.identities['evm'][sub_chain_str] = [add_0x(address)] - - new_address_clean = strip_0x(address) - filepath = os.path.join( +def generate_person_metadata(self, blockchain_address: str, phone_number: str): + logg.debug(f'blockchain address: {blockchain_address}') + old_blockchain_address = old_address_from_phone(self.import_dir, phone_number) + old_address_upper = strip_0x(old_blockchain_address).upper() + metadata_path = f'{self.import_dir}/old/{old_address_upper[:2]}/{old_address_upper[2:4]}/{old_address_upper}.json' + with open(metadata_path, 'r') as metadata_file: + person_metadata = json.load(metadata_file) + person = Person.deserialize(person_metadata) + if not person.identities.get('evm'): + person.identities['evm'] = {} + sub_chain_str = f'{self.chain_spec.common_name()}:{self.chain_spec.network_id()}' + person.identities['evm'][sub_chain_str] = [add_0x(blockchain_address)] + blockchain_address = strip_0x(blockchain_address) + file_path = os.path.join( self.import_dir, 'new', - new_address_clean[:2].upper(), - new_address_clean[2:4].upper(), - new_address_clean.upper() + '.json', + blockchain_address[:2].upper(), + blockchain_address[2:4].upper(), + blockchain_address.upper() + '.json' ) - os.makedirs(os.path.dirname(filepath), exist_ok=True) - - o = u.serialize() - f = open(filepath, 'w') - f.write(json.dumps(o)) - f.close() - - meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person') + os.makedirs(os.path.dirname(file_path), exist_ok=True) + serialized_person_metadata = person.serialize() + with open(file_path, 'w') as metadata_file: + metadata_file.write(json.dumps(serialized_person_metadata)) + logg.debug(f'written person metadata for address: {blockchain_address}') meta_filepath = os.path.join( self.import_dir, 'meta', - '{}.json'.format(new_address_clean.upper()), + '{}.json'.format(blockchain_address.upper()), ) - os.symlink(os.path.realpath(filepath), meta_filepath) + os.symlink(os.path.realpath(file_path), meta_filepath) + return blockchain_address - # write ussd data - ussd_data = { - 'phone': phone, - 'is_activated': 1, - 'preferred_language': random.sample(['en', 'sw'], 1)[0], - 'is_disabled': False - } - ussd_data_dir = os.path.join(self.import_dir, 'ussd') - ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json') - f = open(ussd_data_file_path, 'w') - f.write(json.dumps(ussd_data)) - f.close() - # write preferences data +@celery_app.task(bind=True, base=MetadataTask) +def generate_preferences_data(self, data: tuple): + blockchain_address: str = data[0] + preferences = data[1] preferences_dir = os.path.join(self.import_dir, 'preferences') - preferences_data = { - 'preferred_language': ussd_data['preferred_language'] - } - - preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences') + preferences_key = generate_metadata_pointer(bytes.fromhex(strip_0x(blockchain_address)), ':cic.preferences') preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key) - filepath = os.path.join( preferences_dir, 'new', @@ -164,95 +112,95 @@ def generate_metadata(self, address, phone): preferences_key.upper() + '.json' ) os.makedirs(os.path.dirname(filepath), exist_ok=True) - - f = open(filepath, 'w') - f.write(json.dumps(preferences_data)) - f.close() + with open(filepath, 'w') as preferences_file: + preferences_file.write(json.dumps(preferences)) + logg.debug(f'written preferences metadata: {preferences} for address: {blockchain_address}') os.symlink(os.path.realpath(filepath), preferences_filepath) - - logg.debug('found metadata {} for phone {}'.format(o, phone)) - - return address + return blockchain_address @celery_app.task(bind=True, base=MetadataTask) -def opening_balance_tx(self, address, phone, serial): - old_address = old_address_from_phone(self.import_dir, phone) +def generate_pins_data(self, blockchain_address: str, phone_number: str): + pins_file = f'{self.import_dir}/pins.csv' + file_op = 'a' if os.path.exists(pins_file) else 'w' + with open(pins_file, file_op) as pins_file: + password_hash = uuid.uuid4().hex + pins_file.write(f'{phone_number},{password_hash}\n') + logg.debug(f'written pin data for address: {blockchain_address}') + return blockchain_address - k = to_checksum_address(strip_0x(old_address)) - balance = self.balances[k] - logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone)) +@celery_app.task(bind=True, base=MetadataTask) +def generate_ussd_data(self, blockchain_address: str, phone_number: str): + ussd_data_file = f'{self.import_dir}/ussd_data.csv' + file_op = 'a' if os.path.exists(ussd_data_file) else 'w' + preferred_language = random.sample(["en", "sw"], 1)[0] + preferences = {'preferred_language': preferred_language} + with open(ussd_data_file, file_op) as ussd_data_file: + ussd_data_file.write(f'{phone_number}, { 1}, {preferred_language}, {False}\n') + logg.debug(f'written ussd data for address: {blockchain_address}') + return blockchain_address, preferences + + +@celery_app.task(bind=True, base=MetadataTask) +def opening_balance_tx(self, blockchain_address: str, phone_number: str, serial: str): + old_blockchain_address = old_address_from_phone(self.import_dir, phone_number) + address = to_checksum_address(strip_0x(old_blockchain_address)) + balance = self.balances[address] + logg.debug(f'found balance: {balance} for address: {address} phone: {phone_number}') decimal_balance = self.balance_processor.get_decimal_amount(int(balance)) - - (tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial) - + tx_hash_hex, o = self.balance_processor.get_rpc_tx(blockchain_address, decimal_balance, serial) tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec) - logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) - - tx_path = os.path.join( - self.import_dir, - 'txs', - strip_0x(tx_hash_hex), - ) - - f = open(tx_path, 'w') - f.write(strip_0x(o)) - f.close() - - tx_nonce_path = os.path.join( - self.import_dir, - 'txs', - '.' + str(tx['nonce']), - ) + logg.debug(f'generated tx token value: {decimal_balance}: {blockchain_address} tx hash {tx_hash_hex}') + tx_path = os.path.join(self.import_dir, 'txs', strip_0x(tx_hash_hex)) + with open(tx_path, 'w') as tx_file: + tx_file.write(strip_0x(o)) + logg.debug(f'written tx with tx hash: {tx["hash"]} for address: {blockchain_address}') + tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(tx['nonce'])) os.symlink(os.path.realpath(tx_path), tx_nonce_path) - return tx['hash'] -@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, +@celery_app.task(bind=True, base=MetadataTask) +def resolve_phone(self, phone_number: str): + identifier = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone') + url = parse.urljoin(self.meta_url(), identifier) + logg.debug(f'attempt getting phone pointer at: {url} for phone: {phone_number}') + r = request.urlopen(url) + address = json.load(r) + address = address.replace('"', '') + logg.debug(f'address: {address} for phone: {phone_number}') + return address + + +@celery_app.task(autoretry_for=(FileNotFoundError,), + bind=True, + base=ImportTask, + max_retries=None, default_retry_delay=0.1) def send_txs(self, nonce): - if nonce == self.count + self.balance_processor.nonce_offset: - logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, - self.count)) - return - - logg.debug('attempt to open symlink for nonce {}'.format(nonce)) - tx_nonce_path = os.path.join( - self.import_dir, - 'txs', - '.' + str(nonce), - ) - f = open(tx_nonce_path, 'r') - tx_signed_raw_hex = f.read() - f.close() - - os.unlink(tx_nonce_path) - - o = raw(add_0x(tx_signed_raw_hex)) - tx_hash_hex = self.balance_processor.conn.do(o) - - logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex)) - - nonce += 1 - queue = self.request.delivery_info.get('routing_key') - s = celery.signature( - 'import_task.send_txs', - [ - nonce, - ], - queue=queue, - ) - s.apply_async() + if nonce == self.count + self.balance_processor.nonce_offset: + logg.info(f'reached nonce {nonce} (offset {self.balance_processor.nonce_offset} + count {self.count}).') + celery_app.control.broadcast('shutdown', destination=[f'celery@{queue}']) + logg.debug(f'attempt to open symlink for nonce {nonce}') + tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(nonce)) + with open(tx_nonce_path, 'r') as tx_nonce_file: + tx_signed_raw_hex = tx_nonce_file.read() + os.unlink(tx_nonce_path) + o = raw(add_0x(tx_signed_raw_hex)) + if self.include_balances: + tx_hash_hex = self.balance_processor.conn.do(o) + logg.info(f'sent nonce {nonce} tx hash {tx_hash_hex}') + nonce += 1 + s = celery.signature('import_task.send_txs', [nonce], queue=queue) + s.apply_async() return nonce -@celery_app.task -def set_pins(config: dict, phone_to_pins: list): - # define db connection +@celery_app.task() +def set_pin_data(config: dict, phone_to_pins: list): db_conn = psycopg2.connect( database=config.get('database'), host=config.get('host'), @@ -261,24 +209,17 @@ def set_pins(config: dict, phone_to_pins: list): password=config.get('password') ) db_cursor = db_conn.cursor() - - # update db + sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s' for element in phone_to_pins: - sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s' db_cursor.execute(sql, (element[1], element[0])) logg.debug(f'Updating: {element[0]} with: {element[1]}') - - # commit changes db_conn.commit() - - # close connections db_cursor.close() db_conn.close() @celery_app.task -def set_ussd_data(config: dict, ussd_data: dict): - # define db connection +def set_ussd_data(config: dict, ussd_data: list): db_conn = psycopg2.connect( database=config.get('database'), host=config.get('host'), @@ -287,20 +228,12 @@ def set_ussd_data(config: dict, ussd_data: dict): password=config.get('password') ) db_cursor = db_conn.cursor() - - # process ussd_data - account_status = 1 - if ussd_data['is_activated'] == 1: - account_status = 2 - preferred_language = ussd_data['preferred_language'] - phone_number = ussd_data['phone'] - sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s' - db_cursor.execute(sql, (account_status, preferred_language, phone_number)) - - # commit changes + for element in ussd_data: + status = 2 if int(element[1]) == 1 else 1 + preferred_language = element[2] + phone_number = element[0] + db_cursor.execute(sql, (status, preferred_language, phone_number)) db_conn.commit() - - # close connections db_cursor.close() db_conn.close() diff --git a/apps/data-seeding/cic_ussd/import_users.py b/apps/data-seeding/cic_ussd/import_users.py index 224d090b..72013737 100644 --- a/apps/data-seeding/cic_ussd/import_users.py +++ b/apps/data-seeding/cic_ussd/import_users.py @@ -3,56 +3,61 @@ import argparse import json import logging import os +import redis import sys import time -import urllib.request import uuid +from urllib import request from urllib.parse import urlencode # external imports import celery -import confini import phonenumbers -import redis -from chainlib.chain import ChainSpec from cic_types.models.person import Person +from confini import Config # local imports from import_util import get_celery_worker_status +default_config_dir = './config' logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -default_config_dir = '/usr/local/etc/cic' +arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.') +# batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size +arg_parser.add_argument('--batch-size', + dest='batch_size', + default=100, + type=int, + help='burst size of sending transactions to node') +arg_parser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.') +arg_parser.add_argument('--env-prefix', + default=os.environ.get('CONFINI_ENV_PREFIX'), + dest='env_prefix', + type=str, + help='environment prefix for variables to overwrite configuration.') +arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') +arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.') +arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') +arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') +arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') +arg_parser.add_argument('--ussd-host', dest='ussd_host', type=str, + help="host to ussd app responsible for processing ussd requests.") +arg_parser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') +arg_parser.add_argument('--ussd-port', dest='ussd_port', type=str, + help="port to ussd app responsible for processing ussd requests.") +arg_parser.add_argument('-v', help='be verbose', action='store_true') +arg_parser.add_argument('-vv', help='be more verbose', action='store_true') +arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory') +args = arg_parser.parse_args() -argparser = argparse.ArgumentParser() -argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') -argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string') -argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') -argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') -argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') -argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, - help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size -argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') -argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') -argparser.add_argument('--ussd-host', dest='ussd_host', type=str, - help="host to ussd app responsible for processing ussd requests.") -argparser.add_argument('--ussd-port', dest='ussd_port', type=str, - help="port to ussd app responsible for processing ussd requests.") -argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') -argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') -argparser.add_argument('-v', action='store_true', help='Be verbose') -argparser.add_argument('-vv', action='store_true', help='Be more verbose') -argparser.add_argument('user_dir', type=str, help='path to users export dir tree') -args = argparser.parse_args() +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: + logging.getLogger().setLevel(logging.INFO) -if args.v: - logg.setLevel(logging.INFO) -elif args.vv: - logg.setLevel(logging.DEBUG) - -config_dir = args.c -config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config = Config(args.c, args.env_prefix) config.process() args_override = { 'CIC_CHAIN_SPEC': getattr(args, 'i'), @@ -60,44 +65,29 @@ args_override = { 'REDIS_PORT': getattr(args, 'redis_port'), 'REDIS_DB': getattr(args, 'redis_db'), } -config.dict_override(args_override, 'cli') -logg.debug('config loaded from {}:\n{}'.format(args.c, config)) +config.dict_override(args_override, 'cli flag') +config.censor('PASSWORD', 'DATABASE') +config.censor('PASSWORD', 'SSL') +logg.debug(f'config loaded from {args.c}:\n{config}') -celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -get_celery_worker_status(celery_app=celery_app) +old_account_dir = os.path.join(args.import_dir, 'old') +os.stat(old_account_dir) +logg.debug(f'created old system data dir: {old_account_dir}') -redis_host = config.get('REDIS_HOST') -redis_port = config.get('REDIS_PORT') -redis_db = config.get('REDIS_DB') -r = redis.Redis(redis_host, redis_port, redis_db) +new_account_dir = os.path.join(args.import_dir, 'new') +os.makedirs(new_account_dir, exist_ok=True) +logg.debug(f'created new system data dir: {new_account_dir}') -ps = r.pubsub() +person_metadata_dir = os.path.join(args.import_dir, 'meta') +os.makedirs(person_metadata_dir, exist_ok=True) +logg.debug(f'created person metadata dir: {person_metadata_dir}') -user_new_dir = os.path.join(args.user_dir, 'new') -os.makedirs(user_new_dir, exist_ok=True) - -ussd_data_dir = os.path.join(args.user_dir, 'ussd') -os.makedirs(ussd_data_dir, exist_ok=True) - -preferences_dir = os.path.join(args.user_dir, 'preferences') +preferences_dir = os.path.join(args.import_dir, 'preferences') os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True) +logg.debug(f'created preferences metadata dir: {preferences_dir}') -meta_dir = os.path.join(args.user_dir, 'meta') -os.makedirs(meta_dir, exist_ok=True) +valid_service_codes = config.get('USSD_SERVICE_CODE').split(",") -user_old_dir = os.path.join(args.user_dir, 'old') -os.stat(user_old_dir) - -txs_dir = os.path.join(args.user_dir, 'txs') -os.makedirs(txs_dir, exist_ok=True) - -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -chain_str = str(chain_spec) - -batch_size = args.batch_size -batch_delay = args.batch_delay -ussd_port = args.ussd_port -ussd_host = args.ussd_host ussd_no_ssl = args.ussd_no_ssl if ussd_no_ssl is True: ussd_ssl = False @@ -105,7 +95,17 @@ else: ussd_ssl = True -def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) +get_celery_worker_status(celery_app) + + +def build_ussd_request(host: str, + password: str, + phone_number: str, + port: str, + service_code: str, + username: str, + ssl: bool = False): url = 'http' if ssl: url += 's' @@ -115,16 +115,16 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl= url += '/?username={}&password={}'.format(username, password) logg.info('ussd service url {}'.format(url)) - logg.info('ussd phone {}'.format(phone)) + logg.info('ussd phone {}'.format(phone_number)) session = uuid.uuid4().hex data = { 'sessionId': session, 'serviceCode': service_code, - 'phoneNumber': phone, + 'phoneNumber': phone_number, 'text': service_code, } - req = urllib.request.Request(url) + req = request.Request(url) req.method = 'POST' data_str = urlencode(data) data_bytes = data_str.encode('utf-8') @@ -134,85 +134,77 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl= return req -def register_ussd(i, u): - phone_object = phonenumbers.parse(u.tel) - phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) - logg.debug('tel {} {}'.format(u.tel, phone)) - req = build_ussd_request( - phone, - ussd_host, - ussd_port, - config.get('APP_SERVICE_CODE'), - '', - '', - ussd_ssl - ) - response = urllib.request.urlopen(req) +def e164_phone_number(phone_number: str): + phone_object = phonenumbers.parse(phone_number) + return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) + + +def register_account(person: Person): + phone_number = e164_phone_number(person.tel) + logg.debug(f'tel: {phone_number}') + req = build_ussd_request(args.ussd_host, + '', + phone_number, + args.ussd_port, + valid_service_codes[0], + '', + ussd_ssl) + response = request.urlopen(req) response_data = response.read().decode('utf-8') - state = response_data[:3] - out = response_data[4:] - logg.debug('ussd reponse: {}'.format(out)) + logg.debug(f'ussd response: {response_data[4:]}') if __name__ == '__main__': - i = 0 j = 0 - for x in os.walk(user_old_dir): + for x in os.walk(old_account_dir): for y in x[2]: if y[len(y) - 5:] != '.json': continue - # handle json containing person object - filepath = os.path.join(x[0], y) - f = open(filepath, 'r') - try: - o = json.load(f) - except json.decoder.JSONDecodeError as e: - f.close() - logg.error('load error for {}: {}'.format(y, e)) - continue - f.close() - u = Person.deserialize(o) - register_ussd(i, u) - - phone_object = phonenumbers.parse(u.tel) - phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) - - s_phone = celery.signature( - 'import_task.resolve_phone', - [ - phone, - ], - queue='cic-import-ussd', + file_path = os.path.join(x[0], y) + with open(file_path, 'r') as account_file: + try: + account_data = json.load(account_file) + except json.decoder.JSONDecodeError as e: + logg.error('load error for {}: {}'.format(y, e)) + continue + person = Person.deserialize(account_data) + register_account(person) + phone_number = e164_phone_number(person.tel) + s_resolve_phone = celery.signature( + 'import_task.resolve_phone', [phone_number], queue=args.q ) - s_meta = celery.signature( - 'import_task.generate_metadata', - [ - phone, - ], - queue='cic-import-ussd', + s_person_metadata = celery.signature( + 'import_task.generate_person_metadata', [phone_number], queue=args.q ) - s_balance = celery.signature( - 'import_task.opening_balance_tx', - [ - phone, - i, - ], - queue='cic-import-ussd', + s_ussd_data = celery.signature( + 'import_task.generate_ussd_data', [phone_number], queue=args.q ) - s_meta.link(s_balance) - s_phone.link(s_meta) - # block time plus a bit of time for ussd processing - s_phone.apply_async(countdown=7) + s_preferences_metadata = celery.signature( + 'import_task.generate_preferences_data', [], queue=args.q + ) + + s_pins_data = celery.signature( + 'import_task.generate_pins_data', [phone_number], queue=args.q + ) + + s_opening_balance = celery.signature( + 'import_task.opening_balance_tx', [phone_number, i], queue=args.q + ) + celery.chain(s_resolve_phone, + s_person_metadata, + s_ussd_data, + s_preferences_metadata, + s_pins_data, + s_opening_balance).apply_async(countdown=7) i += 1 - sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") - + sys.stdout.write('imported: {} {}'.format(i, person).ljust(200) + "\r\n") j += 1 - if j == batch_size: - time.sleep(batch_delay) + if j == args.batch_size: + time.sleep(args.batch_delay) j = 0 diff --git a/apps/data-seeding/cic_ussd/import_ussd_data.py b/apps/data-seeding/cic_ussd/import_ussd_data.py index 44f06be9..ecfab8c8 100644 --- a/apps/data-seeding/cic_ussd/import_ussd_data.py +++ b/apps/data-seeding/cic_ussd/import_ussd_data.py @@ -1,67 +1,67 @@ # standard imports import argparse -import json +import csv import logging import os +import psycopg2 # external imports -import celery from confini import Config # local imports + +default_config_dir = './config' logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -default_config_dir = '/usr/local/etc/cic' +arg_parser = argparse.ArgumentParser(description='Pins import script.') +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.') +arg_parser.add_argument('--env-prefix', + default=os.environ.get('CONFINI_ENV_PREFIX'), + dest='env_prefix', + type=str, + help='environment prefix for variables to overwrite configuration.') +arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory') +arg_parser.add_argument('-v', help='be verbose', action='store_true') +arg_parser.add_argument('-vv', help='be more verbose', action='store_true') -arg_parser = argparse.ArgumentParser() -arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file') -arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='Task queue') -arg_parser.add_argument('-v', action='store_true', help='Be verbose') -arg_parser.add_argument('-vv', action='store_true', help='Be more verbose') -arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree') args = arg_parser.parse_args() -if args.v: - logg.setLevel(logging.INFO) -elif args.vv: - logg.setLevel(logging.DEBUG) +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: + logging.getLogger().setLevel(logging.INFO) -config_dir = args.c -config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config = Config(args.c, args.env_prefix) config.process() -logg.debug('config loaded from {}:\n{}'.format(args.c, config)) +config.censor('PASSWORD', 'DATABASE') +logg.debug(f'config loaded from {args.c}:\n{config}') -ussd_data_dir = os.path.join(args.user_dir, 'ussd') -db_configs = { - 'database': config.get('DATABASE_NAME'), - 'host': config.get('DATABASE_HOST'), - 'port': config.get('DATABASE_PORT'), - 'user': config.get('DATABASE_USER'), - 'password': config.get('DATABASE_PASSWORD') -} -celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) +def main(): + with open(f'{args.import_dir}/ussd_data.csv') as ussd_data_file: + ussd_data = [tuple(row) for row in csv.reader(ussd_data_file)] + + db_conn = psycopg2.connect( + database=config.get('DATABASE_NAME'), + host=config.get('DATABASE_HOST'), + port=config.get('DATABASE_PORT'), + user=config.get('DATABASE_USER'), + password=config.get('DATABASE_PASSWORD') + ) + db_cursor = db_conn.cursor() + sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s' + for element in ussd_data: + status = 2 if int(element[1]) == 1 else 1 + preferred_language = element[2] + phone_number = element[0] + db_cursor.execute(sql, (status, preferred_language, phone_number)) + logg.debug(f'Updating account:{phone_number} with: preferred language: {preferred_language} status: {status}.') + db_conn.commit() + db_cursor.close() + db_conn.close() + if __name__ == '__main__': - for x in os.walk(ussd_data_dir): - for y in x[2]: - - if y[len(y) - 5:] == '.json': - filepath = os.path.join(x[0], y) - f = open(filepath, 'r') - try: - ussd_data = json.load(f) - logg.debug(f'LOADING USSD DATA: {ussd_data}') - except json.decoder.JSONDecodeError as e: - f.close() - logg.error('load error for {}: {}'.format(y, e)) - continue - f.close() - - s_set_ussd_data = celery.signature( - 'import_task.set_ussd_data', - [db_configs, ussd_data] - ) - s_set_ussd_data.apply_async(queue='cic-import-ussd') + main() diff --git a/apps/data-seeding/config/app.ini b/apps/data-seeding/config/app.ini index 82cd4f1d..d47c406d 100644 --- a/apps/data-seeding/config/app.ini +++ b/apps/data-seeding/config/app.ini @@ -1,27 +1,4 @@ [app] -ALLOWED_IP=0.0.0.0/0 -LOCALE_FALLBACK=en -LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/ -MAX_BODY_LENGTH=1024 -PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I= -SERVICE_CODE=*483*46# - -[phone_number] -REGION=KE - -[ussd] -MENU_FILE=/usr/src/data/ussd_menu.json -user = -pass = - -[statemachine] -STATES=/usr/src/cic-ussd/states/ -TRANSITIONS=/usr/src/cic-ussd/transitions/ - -[client] -host = -port = -ssl = - -[keystore] -file_path = keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c +allowed_ip=0.0.0.0/0 +max_body_length=1024 +password_pepper= diff --git a/apps/data-seeding/config/database.ini b/apps/data-seeding/config/database.ini index f464cf44..649dd17c 100644 --- a/apps/data-seeding/config/database.ini +++ b/apps/data-seeding/config/database.ini @@ -1,10 +1,10 @@ [database] -NAME=sempo -USER=postgres -PASSWORD= -HOST=localhost -PORT=5432 -ENGINE=postgresql -DRIVER=psycopg2 -DEBUG=0 -POOL_SIZE=1 +name=cic_ussd +user=postgres +password= +host=localhost +port=5432 +engine=postgresql +driver=psycopg2 +debug=0 +pool_size=1 diff --git a/apps/data-seeding/config/ussd.ini b/apps/data-seeding/config/ussd.ini new file mode 100644 index 00000000..d996d16d --- /dev/null +++ b/apps/data-seeding/config/ussd.ini @@ -0,0 +1,5 @@ +[ussd] +menu_file=data/ussd_menu.json +service_code=*483*46#,*483*061#,*384*96# +user = +pass = diff --git a/apps/data-seeding/create_import_pins.py b/apps/data-seeding/create_import_pins.py deleted file mode 100644 index d4dd9716..00000000 --- a/apps/data-seeding/create_import_pins.py +++ /dev/null @@ -1,91 +0,0 @@ -# standard imports -import argparse -import json -import logging -import os -import uuid - -# third-party imports -import bcrypt -import celery -import confini -import phonenumbers -import random -from cic_types.models.person import Person -from cryptography.fernet import Fernet - -# local imports - - -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() - -script_dir = os.path.realpath(os.path.dirname(__file__)) -default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config')) - -arg_parser = argparse.ArgumentParser() -arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir') -arg_parser.add_argument('-v', action='store_true', help='Be verbose') -arg_parser.add_argument('-vv', action='store_true', help='Be more verbose') -arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree') -arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree') - - -args = arg_parser.parse_args() - -if args.v: - logg.setLevel(logging.INFO) -elif args.vv: - logg.setLevel(logging.DEBUG) - -config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX')) -config.process() -logg.info('loaded config\n{}'.format(config)) - -celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) - -user_dir = args.userdir -pins_dir = args.pins_dir - - -def generate_password_hash(): - key = Fernet.generate_key() - fnt = Fernet(key) - pin = str(random.randint(1000, 9999)) - return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode() - - -user_old_dir = os.path.join(user_dir, 'old') -logg.debug(f'reading user data from: {user_old_dir}') - -pins_file = open(f'{pins_dir}/pins.csv', 'w') - -if __name__ == '__main__': - - for x in os.walk(user_old_dir): - for y in x[2]: - # skip non-json files - if y[len(y) - 5:] != '.json': - continue - - # define file path for - filepath = None - if y[:15] != '_ussd_data.json': - filepath = os.path.join(x[0], y) - f = open(filepath, 'r') - try: - o = json.load(f) - except json.decoder.JSONDecodeError as e: - f.close() - logg.error('load error for {}: {}'.format(y, e)) - continue - f.close() - u = Person.deserialize(o) - - phone_object = phonenumbers.parse(u.tel) - phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) - password_hash = uuid.uuid4().hex - pins_file.write(f'{phone},{password_hash}\n') - logg.info(f'Writing phone: {phone}, password_hash: {password_hash}') - - pins_file.close() diff --git a/apps/data-seeding/import_ussd.sh b/apps/data-seeding/import_ussd.sh new file mode 100644 index 00000000..51726c61 --- /dev/null +++ b/apps/data-seeding/import_ussd.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +echo "Creating seed data..." +python create_import_users.py -vv --dir "$IMPORT_DIR" "$ACCOUNT_COUNT" +wait $! +echo "Purge tasks from celery worker" +celery -A cic_ussd.import_task purge -Q "$CELERY_QUEUE" --broker redis://"$REDIS_HOST":"$REDIS_PORT" -f +echo "Start celery work and import balance job" +if [ "$INCLUDE_BALANCES" != "y" ] +then + echo "Running worker without opening balance transactions" + TARGET_TX_COUNT=$ACCOUNT_COUNT + python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" & +else + echo "Running worker with opening balance transactions" + TARGET_TX_COUNT=$((ACCOUNT_COUNT*2)) + python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --include-balances --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" & +fi +until [ -f ./cic-import-ussd.pid ] +do + echo "Polling for celery worker pid file..." +done +IMPORT_BALANCE_JOB=$(