Compare commits

...

16 Commits

Author SHA1 Message Date
PhilipWafula 6a02223189
Updates verification steps. 2021-05-04 00:01:59 +03:00
PhilipWafula f980f9210a
Removes meta query. 2021-05-03 23:47:03 +03:00
PhilipWafula 4f1c15f569
Refactors query function to handle new meta result format. 2021-05-03 23:46:30 +03:00
PhilipWafula 48ad610e20
Moves query for user metadata higher up the application layer. 2021-05-03 23:43:07 +03:00
PhilipWafula 0c769115a1
Adds script to import ussd data. 2021-05-03 23:41:13 +03:00
PhilipWafula fbc46f8ff2
Merge branch 'master' of gitlab.com:grassrootseconomics/cic-internal-integration into philip/import-pins-script 2021-05-03 11:36:52 +03:00
PhilipWafula a7a51a8728
Updates readme with changes to import patterns. 2021-05-03 11:36:41 +03:00
PhilipWafula 32e08dfddf
Refactors handling import of users. 2021-05-03 11:36:19 +03:00
PhilipWafula 0d57fb8679
Adds task to handle importing ussd data. 2021-05-03 11:35:29 +03:00
PhilipWafula c75c2ec630
Separates handling of ussd and user json data in the users export dir tree. 2021-05-03 11:31:14 +03:00
PhilipWafula a9a04d3caa
Merge branch 'master' of gitlab.com:grassrootseconomics/cic-internal-integration into philip/import-pins-script 2021-05-02 08:13:26 +03:00
PhilipWafula daa022c3d8
Merge "origin/master" into "philip/import-pins-script" 2021-04-29 12:38:05 +03:00
PhilipWafula 274f320b03
Adds verifier step for ussd pin imports. 2021-04-26 21:52:27 +03:00
PhilipWafula 3ae9b3e4eb
Refactors to row by row update. 2021-04-26 13:09:23 +03:00
PhilipWafula f544e80b31
Adds scripts to handle pin imports. 2021-04-21 01:37:14 +03:00
PhilipWafula 027b0457bf
Adds task to apply pins to account table. 2021-04-21 01:36:12 +03:00
11 changed files with 454 additions and 120 deletions

View File

@ -120,7 +120,7 @@ class MetadataRequestsHandler(Metadata):
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == ':cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
deserialized_person = person.deserialize(person_data=data)
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@ -325,6 +325,14 @@ def process_menu_interaction_requests(chain_str: str,
# get user
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
# retrieve and cache user's metadata
blockchain_address = user.blockchain_address
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# find any existing ussd session
existing_ussd_session = UssdSession.session.query(UssdSession).filter_by(
external_session_id=external_session_id).first()

View File

@ -371,13 +371,6 @@ def process_start_menu(display_key: str, user: Account):
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)

View File

@ -104,8 +104,8 @@ If importing using `cic_eth` or `cic_ussd` also run:
* cic-eth-retrier
If importing using `cic_ussd` also run:
* cic-ussd-tasker
* cic-ussd-server
* cic-user-tasker
* cic-user-ussd-server
* cic-notify-tasker
If metadata is to be imported, also run:
@ -169,6 +169,24 @@ In second terminal:
`python cic_ussd/import_users.py -v -c config out`
Once the user imports are complete the next step should be importing the user's pins and auxiliary ussd data. This can be done in 3 steps:
In one terminal run:
`python create_import_pins.py -c config -v --userdir <path to the users export dir tree> pinsdir <path to pin export dir tree>`
This script will recursively walk through all the directories defining user data in the users export directory and generate a csv file containing phone numbers and password hashes generated using fernet in a manner reflecting the nature of said hashes in the current sempo system.
This csv file will be stored in the pins export dir defined as the positional argument.
Once the creation of the pins file is complete, proceed to import the pins and ussd data as follows:
- To import the pins:
`python cic_ussd/import_pins.py -c config -v pinsdir <path to pin export dir tree>`
- To import ussd data:
`python cic_ussd/import_ussd_data.py -c config -v userdir <path to the users export dir tree>`
The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log.
The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file.

View File

@ -0,0 +1,70 @@
# standard import
import argparse
import csv
import logging
import os
# third-party imports
import celery
import confini
# local imports
from import_task import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = './config'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
# set log levels
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
# process configs
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature(
'import_task.set_pins',
(db_configs, phone_to_pins),
queue=args.q
)
s_import_pins.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@ -8,6 +8,8 @@ import json
# external imports
import celery
import psycopg2
from psycopg2 import extras
from hexathon import (
strip_0x,
add_0x,
@ -53,7 +55,7 @@ class MetadataTask(ImportTask):
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
@ -91,7 +93,6 @@ def resolve_phone(self, phone):
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo')
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
@ -216,3 +217,60 @@ def send_txs(self, nonce):
return nonce
@celery_app.task
def set_pins(config: dict, phone_to_pins: list):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# update db
for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()
@celery_app.task
def set_ussd_data(config: dict, ussd_data: dict):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET account_status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number))
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()

View File

@ -87,6 +87,13 @@ chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
@ -135,57 +142,60 @@ if __name__ == '__main__':
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
# handle json containing person object
filepath = None
if y != 'ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
u = Person.deserialize(o)
new_address = register_ussd(i, u)
new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
s_meta.link(s_balance)
s_phone.link(s_meta)
# block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
#fi.close()

View File

@ -0,0 +1,70 @@
# standard imports
import argparse
import json
import logging
import os
# external imports
import celery
from confini import Config
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle ussd_data json object
if y == 'ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')

View File

@ -0,0 +1,90 @@
# standard imports
import argparse
import json
import logging
import os
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y != 'ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = generate_password_hash()
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@ -130,6 +130,7 @@ def genCats():
def genAmount():
return random.randint(0, gift_max) * gift_factor
def genDob():
dob_src = fake.date_of_birth(minimum_age=15)
dob = {}
@ -168,8 +169,9 @@ def gen():
}
p.location['area_name'] = city
if random.randint(0, 1):
p.identities['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.identities['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
p.location['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.location['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
return (old_blockchain_checksum_address, phone, p)
@ -215,6 +217,20 @@ if __name__ == '__main__':
json.dump(o.serialize(), f)
f.close()
# create ussd data
ussd_data_dir = os.path.join(d, 'ussd_data')
os.makedirs(ussd_data_dir)
f = open('{}/{}/{}'.format(d, 'ussd_data', 'ussd_data.json'), 'w')
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
json.dump(ussd_data, f)
f.close()
pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
f = open('{}/{}'.format(d, pidx), 'w')

View File

@ -1,52 +1,43 @@
# standard imports
import argparse
import copy
import csv
import hashlib
import json
import logging
import os
import sys
import logging
import time
import argparse
import sys
import re
import hashlib
import csv
import json
import urllib
import copy
import uuid
import urllib.request
import uuid
from collections import Counter
# external imports
import celery
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
import eth_abi
import psycopg2
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.erc20 import ERC20
from chainlib.eth.gas import (
OverrideGasOracle,
balance,
)
OverrideGasOracle,
balance,
)
from chainlib.eth.tx import TxFactory
from chainlib.hash import keccak256_string_to_hex
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.error import EthException
from cic_types.models.person import (
Person,
generate_metadata_pointer,
)
Person,
generate_metadata_pointer,
)
from erc20_single_shot_faucet import SingleShotFaucet
from hexathon import (
strip_0x,
add_0x,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@ -72,6 +63,7 @@ eth_tests = [
phone_tests = [
'ussd',
'ussd_pins'
]
all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests
@ -171,6 +163,39 @@ if logg.isEnabledFor(logging.DEBUG):
outfunc = logg.debug
def send_ussd_request(address, data_dir):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': '',
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
return response.read().decode('utf-8')
class VerifierState:
def __init__(self, item_keys, active_tests=None):
@ -354,42 +379,18 @@ class Verifier:
def verify_ussd(self, address, balance=None):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
self.data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': config.get('APP_SERVICE_CODE'),
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
response_data = send_ussd_request(address, self.data_dir)
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
def verify_ussd_pins(self, address, balance):
response_data = send_ussd_request(address, self.data_dir)
if response_data[:11] != 'CON Balance':
raise VerifierError(response_data, 'pins')
def verify(self, address, balance, debug_stem=None):