diff --git a/apps/cic-ussd/cic_ussd/metadata/base.py b/apps/cic-ussd/cic_ussd/metadata/base.py index e646d8aa..3db80d43 100644 --- a/apps/cic-ussd/cic_ussd/metadata/base.py +++ b/apps/cic-ussd/cic_ussd/metadata/base.py @@ -120,7 +120,7 @@ class MetadataRequestsHandler(Metadata): data = json.loads(response_data.decode('utf-8')) if result.status_code == 200 and self.cic_type == ':cic.person': person = Person() - deserialized_person = person.deserialize(person_data=json.loads(data)) + deserialized_person = person.deserialize(person_data=data) data = json.dumps(deserialized_person.serialize()) cache_data(self.metadata_pointer, data=data) logg.debug(f'caching: {data} with key: {self.metadata_pointer}') diff --git a/apps/cic-ussd/cic_ussd/operations.py b/apps/cic-ussd/cic_ussd/operations.py index ea66825d..5f2ca7a2 100644 --- a/apps/cic-ussd/cic_ussd/operations.py +++ b/apps/cic-ussd/cic_ussd/operations.py @@ -325,6 +325,14 @@ def process_menu_interaction_requests(chain_str: str, # get user user = Account.session.query(Account).filter_by(phone_number=phone_number).first() + # retrieve and cache user's metadata + blockchain_address = user.blockchain_address + s_query_person_metadata = celery.signature( + 'cic_ussd.tasks.metadata.query_person_metadata', + [blockchain_address] + ) + s_query_person_metadata.apply_async(queue='cic-ussd') + # find any existing ussd session existing_ussd_session = UssdSession.session.query(UssdSession).filter_by( external_session_id=external_session_id).first() diff --git a/apps/cic-ussd/cic_ussd/processor.py b/apps/cic-ussd/cic_ussd/processor.py index 195343af..864cd53f 100644 --- a/apps/cic-ussd/cic_ussd/processor.py +++ b/apps/cic-ussd/cic_ussd/processor.py @@ -371,13 +371,6 @@ def process_start_menu(display_key: str, user: Account): # get operational balance operational_balance = compute_operational_balance(balances=balances_data) - # retrieve and cache account's metadata - s_query_person_metadata = celery.signature( - 'cic_ussd.tasks.metadata.query_person_metadata', - [blockchain_address] - ) - s_query_person_metadata.apply_async(queue='cic-ussd') - # retrieve and cache account's statement retrieve_account_statement(blockchain_address=blockchain_address) diff --git a/apps/contract-migration/scripts/README.md b/apps/contract-migration/scripts/README.md index 01861ac5..32dde8d6 100644 --- a/apps/contract-migration/scripts/README.md +++ b/apps/contract-migration/scripts/README.md @@ -89,7 +89,12 @@ After this step is run, you can find top-level ethereum addresses (like the cic #### Custodial provisions - +response_data = send_ussd_request(address, self.data_dir) + state = response_data[:3] + out = response_data[4:] + m = '{} {}'.format(state, out[:7]) + if m != 'CON Welcome': + raise VerifierError(response_data, 'ussd') This step is _only_ needed if you are importing using `cic_eth` or `cic_ussd` `RUN_MASK=2 docker-compose up contract-migration` @@ -104,8 +109,8 @@ If importing using `cic_eth` or `cic_ussd` also run: * cic-eth-retrier If importing using `cic_ussd` also run: -* cic-ussd-tasker -* cic-ussd-server +* cic-user-tasker +* cic-user-ussd-server * cic-notify-tasker If metadata is to be imported, also run: @@ -169,6 +174,26 @@ In second terminal: `python cic_ussd/import_users.py -v -c config out` + +##### Importing pins and ussd data (optional) +Once the user imports are complete the next step should be importing the user's pins and auxiliary ussd data. This can be done in 3 steps: + +In one terminal run: + +`python create_import_pins.py -c config -v --userdir pinsdir ` + +This script will recursively walk through all the directories defining user data in the users export directory and generate a csv file containing phone numbers and password hashes generated using fernet in a manner reflecting the nature of said hashes in the old system. +This csv file will be stored in the pins export dir defined as the positional argument. + +Once the creation of the pins file is complete, proceed to import the pins and ussd data as follows: + +- To import the pins: + +`python cic_ussd/import_pins.py -c config -v pinsdir ` + +- To import ussd data: +`python cic_ussd/import_ussd_data.py -c config -v userdir ` + The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log. The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file. diff --git a/apps/contract-migration/scripts/cic_ussd/import_pins.py b/apps/contract-migration/scripts/cic_ussd/import_pins.py new file mode 100644 index 00000000..37bc5eb4 --- /dev/null +++ b/apps/contract-migration/scripts/cic_ussd/import_pins.py @@ -0,0 +1,70 @@ +# standard import +import argparse +import csv +import logging +import os + +# third-party imports +import celery +import confini + +# local imports +from import_task import * + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +default_config_dir = './config' + +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use') +arg_parser.add_argument('--env-prefix', + default=os.environ.get('CONFINI_ENV_PREFIX'), + dest='env_prefix', + type=str, + help='environment prefix for variables to overwrite configuration') +arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to') +arg_parser.add_argument('-v', help='be verbose', action='store_true') +arg_parser.add_argument('-vv', help='be more verbose', action='store_true') +arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory') +args = arg_parser.parse_args() + +# set log levels +if args.v: + logg.setLevel(logging.INFO) +elif args.vv: + logg.setLevel(logging.DEBUG) + +# process configs +config_dir = args.c +config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config.process() + +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) + +db_configs = { + 'database': config.get('DATABASE_NAME'), + 'host': config.get('DATABASE_HOST'), + 'port': config.get('DATABASE_PORT'), + 'user': config.get('DATABASE_USER'), + 'password': config.get('DATABASE_PASSWORD') +} + + +def main(): + with open(f'{args.pins_dir}/pins.csv') as pins_file: + phone_to_pins = [tuple(row) for row in csv.reader(pins_file)] + + s_import_pins = celery.signature( + 'import_task.set_pins', + (db_configs, phone_to_pins), + queue=args.q + ) + s_import_pins.apply_async() + + argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG'] + celery_app.worker_main(argv) + + +if __name__ == '__main__': + main() diff --git a/apps/contract-migration/scripts/cic_ussd/import_task.py b/apps/contract-migration/scripts/cic_ussd/import_task.py index a3f2abf0..bc06b4e8 100644 --- a/apps/contract-migration/scripts/cic_ussd/import_task.py +++ b/apps/contract-migration/scripts/cic_ussd/import_task.py @@ -8,6 +8,8 @@ import json # external imports import celery +import psycopg2 +from psycopg2 import extras from hexathon import ( strip_0x, add_0x, @@ -53,7 +55,7 @@ class MetadataTask(ImportTask): def meta_url(self): scheme = 'http' if self.meta_ssl: - scheme += s + scheme += 's' url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path)) return urllib.parse.urlunparse(url) @@ -91,7 +93,6 @@ def resolve_phone(self, phone): def generate_metadata(self, address, phone): old_address = old_address_from_phone(self.import_dir, phone) - logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo') logg.debug('address {}'.format(address)) old_address_upper = strip_0x(old_address).upper() metadata_path = '{}/old/{}/{}/{}.json'.format( @@ -216,3 +217,60 @@ def send_txs(self, nonce): return nonce + + +@celery_app.task +def set_pins(config: dict, phone_to_pins: list): + # define db connection + db_conn = psycopg2.connect( + database=config.get('database'), + host=config.get('host'), + port=config.get('port'), + user=config.get('user'), + password=config.get('password') + ) + db_cursor = db_conn.cursor() + + # update db + for element in phone_to_pins: + sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s' + db_cursor.execute(sql, (element[1], element[0])) + logg.debug(f'Updating: {element[0]} with: {element[1]}') + + # commit changes + db_conn.commit() + + # close connections + db_cursor.close() + db_conn.close() + + +@celery_app.task +def set_ussd_data(config: dict, ussd_data: dict): + # define db connection + db_conn = psycopg2.connect( + database=config.get('database'), + host=config.get('host'), + port=config.get('port'), + user=config.get('user'), + password=config.get('password') + ) + db_cursor = db_conn.cursor() + + # process ussd_data + account_status = 1 + if ussd_data['is_activated'] == 1: + account_status = 2 + preferred_language = ussd_data['preferred_language'] + phone_number = ussd_data['phone'] + + sql = 'UPDATE account SET account_status = %s, preferred_language = %s WHERE phone_number = %s' + db_cursor.execute(sql, (account_status, preferred_language, phone_number)) + + # commit changes + db_conn.commit() + + # close connections + db_cursor.close() + db_conn.close() + diff --git a/apps/contract-migration/scripts/cic_ussd/import_users.py b/apps/contract-migration/scripts/cic_ussd/import_users.py index ca9bf3f1..1898c17c 100644 --- a/apps/contract-migration/scripts/cic_ussd/import_users.py +++ b/apps/contract-migration/scripts/cic_ussd/import_users.py @@ -87,6 +87,13 @@ chain_str = str(chain_spec) batch_size = args.batch_size batch_delay = args.batch_delay +db_configs = { + 'database': config.get('DATABASE_NAME'), + 'host': config.get('DATABASE_HOST'), + 'port': config.get('DATABASE_PORT'), + 'user': config.get('DATABASE_USER'), + 'password': config.get('DATABASE_PASSWORD') +} def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): @@ -135,57 +142,60 @@ if __name__ == '__main__': for y in x[2]: if y[len(y)-5:] != '.json': continue - filepath = os.path.join(x[0], y) - f = open(filepath, 'r') - try: - o = json.load(f) - except json.decoder.JSONDecodeError as e: + # handle json containing person object + filepath = None + if y[:15] != '_ussd_data.json': + filepath = os.path.join(x[0], y) + f = open(filepath, 'r') + try: + o = json.load(f) + except json.decoder.JSONDecodeError as e: + f.close() + logg.error('load error for {}: {}'.format(y, e)) + continue f.close() - logg.error('load error for {}: {}'.format(y, e)) - continue - f.close() - u = Person.deserialize(o) + u = Person.deserialize(o) - new_address = register_ussd(i, u) + new_address = register_ussd(i, u) - phone_object = phonenumbers.parse(u.tel) - phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) + phone_object = phonenumbers.parse(u.tel) + phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) - s_phone = celery.signature( - 'import_task.resolve_phone', - [ - phone, - ], - queue='cic-import-ussd', - ) + s_phone = celery.signature( + 'import_task.resolve_phone', + [ + phone, + ], + queue='cic-import-ussd', + ) - s_meta = celery.signature( - 'import_task.generate_metadata', - [ - phone, - ], - queue='cic-import-ussd', - ) + s_meta = celery.signature( + 'import_task.generate_metadata', + [ + phone, + ], + queue='cic-import-ussd', + ) - s_balance = celery.signature( - 'import_task.opening_balance_tx', - [ - phone, - i, - ], - queue='cic-import-ussd', - ) + s_balance = celery.signature( + 'import_task.opening_balance_tx', + [ + phone, + i, + ], + queue='cic-import-ussd', + ) - s_meta.link(s_balance) - s_phone.link(s_meta) - s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing + s_meta.link(s_balance) + s_phone.link(s_meta) + # block time plus a bit of time for ussd processing + s_phone.apply_async(countdown=7) - i += 1 - sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") - - j += 1 - if j == batch_size: - time.sleep(batch_delay) - j = 0 + i += 1 + sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") + + j += 1 + if j == batch_size: + time.sleep(batch_delay) + j = 0 - #fi.close() diff --git a/apps/contract-migration/scripts/cic_ussd/import_ussd_data.py b/apps/contract-migration/scripts/cic_ussd/import_ussd_data.py new file mode 100644 index 00000000..d2283000 --- /dev/null +++ b/apps/contract-migration/scripts/cic_ussd/import_ussd_data.py @@ -0,0 +1,70 @@ +# standard imports +import argparse +import json +import logging +import os + +# external imports +import celery +from confini import Config + +# local imports + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +default_config_dir = '/usr/local/etc/cic' + +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file') +arg_parser.add_argument('-q', type=str, default='cic-eth', help='Task queue') +arg_parser.add_argument('-v', action='store_true', help='Be verbose') +arg_parser.add_argument('-vv', action='store_true', help='Be more verbose') +arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree') +args = arg_parser.parse_args() + +if args.v: + logg.setLevel(logging.INFO) +elif args.vv: + logg.setLevel(logging.DEBUG) + +config_dir = args.c +config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config.process() + +user_old_dir = os.path.join(args.user_dir, 'old') +os.stat(user_old_dir) + +db_configs = { + 'database': config.get('DATABASE_NAME'), + 'host': config.get('DATABASE_HOST'), + 'port': config.get('DATABASE_PORT'), + 'user': config.get('DATABASE_USER'), + 'password': config.get('DATABASE_PASSWORD') +} +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) + +if __name__ == '__main__': + for x in os.walk(user_old_dir): + for y in x[2]: + + if y[len(y) - 5:] != '.json': + continue + + # handle ussd_data json object + if y[:15] == '_ussd_data.json': + filepath = os.path.join(x[0], y) + f = open(filepath, 'r') + try: + ussd_data = json.load(f) + except json.decoder.JSONDecodeError as e: + f.close() + logg.error('load error for {}: {}'.format(y, e)) + continue + f.close() + + s_set_ussd_data = celery.signature( + 'import_task.set_ussd_data', + [db_configs, ussd_data] + ) + s_set_ussd_data.apply_async(queue='cic-import-ussd') diff --git a/apps/contract-migration/scripts/create_import_pins.py b/apps/contract-migration/scripts/create_import_pins.py new file mode 100644 index 00000000..daebe3ba --- /dev/null +++ b/apps/contract-migration/scripts/create_import_pins.py @@ -0,0 +1,90 @@ +# standard imports +import argparse +import json +import logging +import os + +# third-party imports +import bcrypt +import celery +import confini +import phonenumbers +import random +from cic_types.models.person import Person +from cryptography.fernet import Fernet + +# local imports + + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.realpath(os.path.dirname(__file__)) +default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config')) + +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir') +arg_parser.add_argument('-v', action='store_true', help='Be verbose') +arg_parser.add_argument('-vv', action='store_true', help='Be more verbose') +arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree') +arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree') + + +args = arg_parser.parse_args() + +if args.v: + logg.setLevel(logging.INFO) +elif args.vv: + logg.setLevel(logging.DEBUG) + +config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX')) +config.process() +logg.info('loaded config\n{}'.format(config)) + +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) + +user_dir = args.userdir +pins_dir = args.pins_dir + + +def generate_password_hash(): + key = Fernet.generate_key() + fnt = Fernet(key) + pin = str(random.randint(1000, 9999)) + return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode() + + +user_old_dir = os.path.join(user_dir, 'old') +logg.debug(f'reading user data from: {user_old_dir}') + +pins_file = open(f'{pins_dir}/pins.csv', 'w') + +if __name__ == '__main__': + + for x in os.walk(user_old_dir): + for y in x[2]: + # skip non-json files + if y[len(y) - 5:] != '.json': + continue + + # define file path for + filepath = None + if y[:15] != '_ussd_data.json': + filepath = os.path.join(x[0], y) + f = open(filepath, 'r') + try: + o = json.load(f) + except json.decoder.JSONDecodeError as e: + f.close() + logg.error('load error for {}: {}'.format(y, e)) + continue + f.close() + u = Person.deserialize(o) + + phone_object = phonenumbers.parse(u.tel) + phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) + password_hash = generate_password_hash() + pins_file.write(f'{phone},{password_hash}\n') + logg.info(f'Writing phone: {phone}, password_hash: {password_hash}') + + pins_file.close() diff --git a/apps/contract-migration/scripts/create_import_users.py b/apps/contract-migration/scripts/create_import_users.py index fbd919b3..ed111092 100644 --- a/apps/contract-migration/scripts/create_import_users.py +++ b/apps/contract-migration/scripts/create_import_users.py @@ -130,6 +130,7 @@ def genCats(): def genAmount(): return random.randint(0, gift_max) * gift_factor + def genDob(): dob_src = fake.date_of_birth(minimum_age=15) dob = {} @@ -168,8 +169,9 @@ def gen(): } p.location['area_name'] = city if random.randint(0, 1): - p.identities['latitude'] = (random.random() + 180) - 90 #fake.local_latitude() - p.identities['longitude'] = (random.random() + 360) - 180 #fake.local_latitude() + p.location['latitude'] = (random.random() + 180) - 90 #fake.local_latitude() + p.location['longitude'] = (random.random() + 360) - 180 #fake.local_latitude() + return (old_blockchain_checksum_address, phone, p) @@ -210,10 +212,20 @@ if __name__ == '__main__': print(o) + ussd_data = { + 'phone': phone, + 'is_activated': 1, + 'preferred_language': random.sample(['en', 'sw'], 1)[0], + 'is_disabled': False + } + d = prepareLocalFilePath(base_dir, uid) f = open('{}/{}'.format(d, uid + '.json'), 'w') json.dump(o.serialize(), f) f.close() + x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w') + json.dump(ussd_data, x) + x.close() pidx = genPhoneIndex(phone) d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx) diff --git a/apps/contract-migration/scripts/verify.py b/apps/contract-migration/scripts/verify.py index 5d7ffc2d..6a526e02 100644 --- a/apps/contract-migration/scripts/verify.py +++ b/apps/contract-migration/scripts/verify.py @@ -1,52 +1,37 @@ # standard imports +import argparse +import copy +import hashlib +import json +import logging import os import sys -import logging -import time -import argparse -import sys -import re -import hashlib -import csv -import json import urllib -import copy -import uuid import urllib.request +import uuid # external imports import celery -import eth_abi import confini -from hexathon import ( - strip_0x, - add_0x, - ) -from chainsyncer.backend.memory import MemBackend -from chainsyncer.driver import HeadSyncer +import eth_abi from chainlib.chain import ChainSpec +from chainlib.eth.address import to_checksum_address from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.constant import ZERO_ADDRESS -from chainlib.eth.block import ( - block_latest, - block_by_number, - Block, - ) -from chainlib.hash import keccak256_string_to_hex -from chainlib.eth.address import to_checksum_address from chainlib.eth.gas import ( - OverrideGasOracle, - balance, - ) + OverrideGasOracle, + balance, +) from chainlib.eth.tx import TxFactory +from chainlib.hash import keccak256_string_to_hex from chainlib.jsonrpc import jsonrpc_template -from chainlib.eth.error import EthException from cic_types.models.person import ( - Person, - generate_metadata_pointer, - ) + Person, + generate_metadata_pointer, +) from erc20_faucet import Faucet from eth_erc20 import ERC20 +from hexathon.parse import strip_0x, add_0x logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -72,6 +57,7 @@ eth_tests = [ phone_tests = [ 'ussd', + 'ussd_pins' ] all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests @@ -171,6 +157,39 @@ if logg.isEnabledFor(logging.DEBUG): outfunc = logg.debug +def send_ussd_request(address, data_dir): + upper_address = strip_0x(address).upper() + f = open(os.path.join( + data_dir, + 'new', + upper_address[:2], + upper_address[2:4], + upper_address + '.json', + ), 'r' + ) + o = json.load(f) + f.close() + + p = Person.deserialize(o) + phone = p.tel + + session = uuid.uuid4().hex + data = { + 'sessionId': session, + 'serviceCode': config.get('APP_SERVICE_CODE'), + 'phoneNumber': phone, + 'text': '', + } + + req = urllib.request.Request(config.get('_USSD_PROVIDER')) + data_str = json.dumps(data) + data_bytes = data_str.encode('utf-8') + req.add_header('Content-Type', 'application/json') + req.data = data_bytes + response = urllib.request.urlopen(req) + return response.read().decode('utf-8') + + class VerifierState: def __init__(self, item_keys, active_tests=None): @@ -354,42 +373,18 @@ class Verifier: def verify_ussd(self, address, balance=None): - upper_address = strip_0x(address).upper() - f = open(os.path.join( - self.data_dir, - 'new', - upper_address[:2], - upper_address[2:4], - upper_address + '.json', - ), 'r' - ) - o = json.load(f) - f.close() - - p = Person.deserialize(o) - phone = p.tel - - session = uuid.uuid4().hex - data = { - 'sessionId': session, - 'serviceCode': config.get('APP_SERVICE_CODE'), - 'phoneNumber': phone, - 'text': config.get('APP_SERVICE_CODE'), - } - - req = urllib.request.Request(config.get('_USSD_PROVIDER')) - data_str = json.dumps(data) - data_bytes = data_str.encode('utf-8') - req.add_header('Content-Type', 'application/json') - req.data = data_bytes - response = urllib.request.urlopen(req) - response_data = response.read().decode('utf-8') + response_data = send_ussd_request(address, self.data_dir) state = response_data[:3] out = response_data[4:] m = '{} {}'.format(state, out[:7]) if m != 'CON Welcome': raise VerifierError(response_data, 'ussd') + def verify_ussd_pins(self, address, balance): + response_data = send_ussd_request(address, self.data_dir) + if response_data[:11] != 'CON Balance': + raise VerifierError(response_data, 'pins') + def verify(self, address, balance, debug_stem=None):