Philip/import pins script

This commit is contained in:
Philip Wafula 2021-05-15 07:40:34 +00:00
parent 0b4d8d5937
commit 1cb172b8bf
11 changed files with 452 additions and 121 deletions

View File

@ -120,7 +120,7 @@ class MetadataRequestsHandler(Metadata):
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == ':cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
deserialized_person = person.deserialize(person_data=data)
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@ -325,6 +325,14 @@ def process_menu_interaction_requests(chain_str: str,
# get user
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
# retrieve and cache user's metadata
blockchain_address = user.blockchain_address
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# find any existing ussd session
existing_ussd_session = UssdSession.session.query(UssdSession).filter_by(
external_session_id=external_session_id).first()

View File

@ -371,13 +371,6 @@ def process_start_menu(display_key: str, user: Account):
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)

View File

@ -89,7 +89,12 @@ After this step is run, you can find top-level ethereum addresses (like the cic
#### Custodial provisions
response_data = send_ussd_request(address, self.data_dir)
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
This step is _only_ needed if you are importing using `cic_eth` or `cic_ussd`
`RUN_MASK=2 docker-compose up contract-migration`
@ -104,8 +109,8 @@ If importing using `cic_eth` or `cic_ussd` also run:
* cic-eth-retrier
If importing using `cic_ussd` also run:
* cic-ussd-tasker
* cic-ussd-server
* cic-user-tasker
* cic-user-ussd-server
* cic-notify-tasker
If metadata is to be imported, also run:
@ -169,6 +174,26 @@ In second terminal:
`python cic_ussd/import_users.py -v -c config out`
##### Importing pins and ussd data (optional)
Once the user imports are complete the next step should be importing the user's pins and auxiliary ussd data. This can be done in 3 steps:
In one terminal run:
`python create_import_pins.py -c config -v --userdir <path to the users export dir tree> pinsdir <path to pin export dir tree>`
This script will recursively walk through all the directories defining user data in the users export directory and generate a csv file containing phone numbers and password hashes generated using fernet in a manner reflecting the nature of said hashes in the old system.
This csv file will be stored in the pins export dir defined as the positional argument.
Once the creation of the pins file is complete, proceed to import the pins and ussd data as follows:
- To import the pins:
`python cic_ussd/import_pins.py -c config -v pinsdir <path to pin export dir tree>`
- To import ussd data:
`python cic_ussd/import_ussd_data.py -c config -v userdir <path to the users export dir tree>`
The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log.
The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file.

View File

@ -0,0 +1,70 @@
# standard import
import argparse
import csv
import logging
import os
# third-party imports
import celery
import confini
# local imports
from import_task import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = './config'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
# set log levels
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
# process configs
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature(
'import_task.set_pins',
(db_configs, phone_to_pins),
queue=args.q
)
s_import_pins.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@ -8,6 +8,8 @@ import json
# external imports
import celery
import psycopg2
from psycopg2 import extras
from hexathon import (
strip_0x,
add_0x,
@ -53,7 +55,7 @@ class MetadataTask(ImportTask):
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
@ -91,7 +93,6 @@ def resolve_phone(self, phone):
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo')
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
@ -216,3 +217,60 @@ def send_txs(self, nonce):
return nonce
@celery_app.task
def set_pins(config: dict, phone_to_pins: list):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# update db
for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()
@celery_app.task
def set_ussd_data(config: dict, ussd_data: dict):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET account_status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number))
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()

View File

@ -87,6 +87,13 @@ chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
@ -135,57 +142,60 @@ if __name__ == '__main__':
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
# handle json containing person object
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
u = Person.deserialize(o)
new_address = register_ussd(i, u)
new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
s_meta.link(s_balance)
s_phone.link(s_meta)
# block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
#fi.close()

View File

@ -0,0 +1,70 @@
# standard imports
import argparse
import json
import logging
import os
# external imports
import celery
from confini import Config
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle ussd_data json object
if y[:15] == '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')

View File

@ -0,0 +1,90 @@
# standard imports
import argparse
import json
import logging
import os
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = generate_password_hash()
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@ -130,6 +130,7 @@ def genCats():
def genAmount():
return random.randint(0, gift_max) * gift_factor
def genDob():
dob_src = fake.date_of_birth(minimum_age=15)
dob = {}
@ -168,8 +169,9 @@ def gen():
}
p.location['area_name'] = city
if random.randint(0, 1):
p.identities['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.identities['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
p.location['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.location['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
return (old_blockchain_checksum_address, phone, p)
@ -210,10 +212,20 @@ if __name__ == '__main__':
print(o)
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
d = prepareLocalFilePath(base_dir, uid)
f = open('{}/{}'.format(d, uid + '.json'), 'w')
json.dump(o.serialize(), f)
f.close()
x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w')
json.dump(ussd_data, x)
x.close()
pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)

View File

@ -1,52 +1,37 @@
# standard imports
import argparse
import copy
import hashlib
import json
import logging
import os
import sys
import logging
import time
import argparse
import sys
import re
import hashlib
import csv
import json
import urllib
import copy
import uuid
import urllib.request
import uuid
# external imports
import celery
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
import eth_abi
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.gas import (
OverrideGasOracle,
balance,
)
OverrideGasOracle,
balance,
)
from chainlib.eth.tx import TxFactory
from chainlib.hash import keccak256_string_to_hex
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.error import EthException
from cic_types.models.person import (
Person,
generate_metadata_pointer,
)
Person,
generate_metadata_pointer,
)
from erc20_faucet import Faucet
from eth_erc20 import ERC20
from hexathon.parse import strip_0x, add_0x
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@ -72,6 +57,7 @@ eth_tests = [
phone_tests = [
'ussd',
'ussd_pins'
]
all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests
@ -171,6 +157,39 @@ if logg.isEnabledFor(logging.DEBUG):
outfunc = logg.debug
def send_ussd_request(address, data_dir):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': '',
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
return response.read().decode('utf-8')
class VerifierState:
def __init__(self, item_keys, active_tests=None):
@ -354,42 +373,18 @@ class Verifier:
def verify_ussd(self, address, balance=None):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
self.data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': config.get('APP_SERVICE_CODE'),
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
response_data = send_ussd_request(address, self.data_dir)
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
def verify_ussd_pins(self, address, balance):
response_data = send_ussd_request(address, self.data_dir)
if response_data[:11] != 'CON Balance':
raise VerifierError(response_data, 'pins')
def verify(self, address, balance, debug_stem=None):