Refactors handling import of users.

This commit is contained in:
PhilipWafula 2021-05-03 11:36:19 +03:00
parent 0d57fb8679
commit 32e08dfddf
Signed by untrusted user: mango-habanero
GPG Key ID: B00CE9034DA19FB7

View File

@ -87,6 +87,13 @@ chain_str = str(chain_spec)
batch_size = args.batch_size batch_size = args.batch_size
batch_delay = args.batch_delay batch_delay = args.batch_delay
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
@ -135,57 +142,60 @@ if __name__ == '__main__':
for y in x[2]: for y in x[2]:
if y[len(y)-5:] != '.json': if y[len(y)-5:] != '.json':
continue continue
filepath = os.path.join(x[0], y) # handle json containing person object
f = open(filepath, 'r') filepath = None
try: if y != 'ussd_data.json':
o = json.load(f) filepath = os.path.join(x[0], y)
except json.decoder.JSONDecodeError as e: f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close() f.close()
logg.error('load error for {}: {}'.format(y, e)) u = Person.deserialize(o)
continue
f.close()
u = Person.deserialize(o)
new_address = register_ussd(i, u) new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel) phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature( s_phone = celery.signature(
'import_task.resolve_phone', 'import_task.resolve_phone',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta = celery.signature( s_meta = celery.signature(
'import_task.generate_metadata', 'import_task.generate_metadata',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_balance = celery.signature( s_balance = celery.signature(
'import_task.opening_balance_tx', 'import_task.opening_balance_tx',
[ [
phone, phone,
i, i,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta.link(s_balance) s_meta.link(s_balance)
s_phone.link(s_meta) s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing # block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
i += 1 i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1 j += 1
if j == batch_size: if j == batch_size:
time.sleep(batch_delay) time.sleep(batch_delay)
j = 0 j = 0
#fi.close()