2021-02-21 16:41:37 +01:00
# standard imports
2021-06-23 06:29:38 +02:00
import argparse
2021-02-21 16:41:37 +01:00
import json
import logging
2021-06-23 06:29:38 +02:00
import os
import sys
2021-02-21 16:41:37 +01:00
import time
2021-04-09 15:00:15 +02:00
import urllib . request
2021-06-23 06:29:38 +02:00
import uuid
from urllib . parse import urlencode
2021-02-21 16:41:37 +01:00
2021-06-23 06:29:38 +02:00
# external imports
2021-02-21 16:41:37 +01:00
import celery
2021-06-23 06:29:38 +02:00
import confini
2021-04-09 15:00:15 +02:00
import phonenumbers
2021-06-23 06:29:38 +02:00
import redis
from chainlib . chain import ChainSpec
from cic_types . models . person import Person
2021-02-21 16:41:37 +01:00
2021-07-03 18:55:51 +02:00
# local imports
from import_util import get_celery_worker_status
2021-02-21 16:41:37 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
default_config_dir = ' /usr/local/etc/cic '
argparser = argparse . ArgumentParser ( )
argparser . add_argument ( ' -c ' , type = str , default = default_config_dir , help = ' config file ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' Chain specification string ' )
argparser . add_argument ( ' --redis-host ' , dest = ' redis_host ' , type = str , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-port ' , dest = ' redis_port ' , type = int , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-db ' , dest = ' redis_db ' , type = int , help = ' redis db to use for task submission and callback ' )
2021-07-03 18:55:51 +02:00
argparser . add_argument ( ' --batch-size ' , dest = ' batch_size ' , default = 100 , type = int ,
help = ' burst size of sending transactions to node ' ) # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
2021-04-09 15:00:15 +02:00
argparser . add_argument ( ' --batch-delay ' , dest = ' batch_delay ' , default = 3 , type = int , help = ' seconds delay between batches ' )
2021-03-07 19:01:44 +01:00
argparser . add_argument ( ' --timeout ' , default = 60.0 , type = float , help = ' Callback timeout ' )
2021-07-03 18:55:51 +02:00
argparser . add_argument ( ' --ussd-host ' , dest = ' ussd_host ' , type = str ,
help = " host to ussd app responsible for processing ussd requests. " )
argparser . add_argument ( ' --ussd-port ' , dest = ' ussd_port ' , type = str ,
help = " port to ussd app responsible for processing ussd requests. " )
2021-06-29 00:40:54 +02:00
argparser . add_argument ( ' --ussd-no-ssl ' , dest = ' ussd_no_ssl ' , help = ' do not use ssl (careful) ' , action = ' store_true ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' Task queue ' )
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' Be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' Be more verbose ' )
argparser . add_argument ( ' user_dir ' , type = str , help = ' path to users export dir tree ' )
args = argparser . parse_args ( )
if args . v :
logg . setLevel ( logging . INFO )
elif args . vv :
logg . setLevel ( logging . DEBUG )
config_dir = args . c
config = confini . Config ( config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) )
config . process ( )
args_override = {
2021-07-03 18:55:51 +02:00
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
' REDIS_HOST ' : getattr ( args , ' redis_host ' ) ,
' REDIS_PORT ' : getattr ( args , ' redis_port ' ) ,
' REDIS_DB ' : getattr ( args , ' redis_db ' ) ,
}
2021-02-21 16:41:37 +01:00
config . dict_override ( args_override , ' cli ' )
2021-07-03 18:55:51 +02:00
logg . debug ( ' config loaded from {} : \n {} ' . format ( args . c , config ) )
2021-02-21 16:41:37 +01:00
celery_app = celery . Celery ( broker = config . get ( ' CELERY_BROKER_URL ' ) , backend = config . get ( ' CELERY_RESULT_URL ' ) )
2021-07-03 18:55:51 +02:00
get_celery_worker_status ( celery_app = celery_app )
2021-02-21 16:41:37 +01:00
redis_host = config . get ( ' REDIS_HOST ' )
redis_port = config . get ( ' REDIS_PORT ' )
redis_db = config . get ( ' REDIS_DB ' )
r = redis . Redis ( redis_host , redis_port , redis_db )
ps = r . pubsub ( )
user_new_dir = os . path . join ( args . user_dir , ' new ' )
2021-07-03 18:55:51 +02:00
os . makedirs ( user_new_dir , exist_ok = True )
2021-02-21 16:41:37 +01:00
2021-06-25 17:36:05 +02:00
ussd_data_dir = os . path . join ( args . user_dir , ' ussd ' )
2021-07-03 18:55:51 +02:00
os . makedirs ( ussd_data_dir , exist_ok = True )
2021-06-25 17:36:05 +02:00
preferences_dir = os . path . join ( args . user_dir , ' preferences ' )
2021-07-03 18:55:51 +02:00
os . makedirs ( os . path . join ( preferences_dir , ' meta ' ) , exist_ok = True )
2021-06-25 17:36:05 +02:00
2021-02-21 16:41:37 +01:00
meta_dir = os . path . join ( args . user_dir , ' meta ' )
2021-07-03 18:55:51 +02:00
os . makedirs ( meta_dir , exist_ok = True )
2021-02-21 16:41:37 +01:00
user_old_dir = os . path . join ( args . user_dir , ' old ' )
os . stat ( user_old_dir )
2021-04-09 15:00:15 +02:00
txs_dir = os . path . join ( args . user_dir , ' txs ' )
2021-07-03 18:55:51 +02:00
os . makedirs ( txs_dir , exist_ok = True )
2021-04-09 15:00:15 +02:00
2021-02-21 16:41:37 +01:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
chain_str = str ( chain_spec )
batch_size = args . batch_size
batch_delay = args . batch_delay
2021-06-29 00:40:54 +02:00
ussd_port = args . ussd_port
ussd_host = args . ussd_host
ussd_no_ssl = args . ussd_no_ssl
if ussd_no_ssl is True :
ussd_ssl = False
else :
ussd_ssl = True
2021-02-21 16:41:37 +01:00
2021-07-03 18:55:51 +02:00
2021-04-09 15:00:15 +02:00
def build_ussd_request ( phone , host , port , service_code , username , password , ssl = False ) :
url = ' http '
if ssl :
url + = ' s '
2021-06-29 00:40:54 +02:00
url + = ' :// {} ' . format ( host )
if port :
url + = ' : {} ' . format ( port )
2021-06-23 06:29:38 +02:00
url + = ' /?username= {} &password= {} ' . format ( username , password )
2021-02-21 16:41:37 +01:00
2021-04-09 15:00:15 +02:00
logg . info ( ' ussd service url {} ' . format ( url ) )
logg . info ( ' ussd phone {} ' . format ( phone ) )
session = uuid . uuid4 ( ) . hex
data = {
2021-07-03 18:55:51 +02:00
' sessionId ' : session ,
' serviceCode ' : service_code ,
' phoneNumber ' : phone ,
' text ' : service_code ,
}
2021-04-09 15:00:15 +02:00
req = urllib . request . Request ( url )
2021-07-03 18:55:51 +02:00
req . method = ' POST '
2021-06-23 06:29:38 +02:00
data_str = urlencode ( data )
2021-04-09 15:00:15 +02:00
data_bytes = data_str . encode ( ' utf-8 ' )
2021-06-23 06:29:38 +02:00
req . add_header ( ' Content-Type ' , ' application/x-www-form-urlencoded ' )
2021-04-09 15:00:15 +02:00
req . data = data_bytes
return req
def register_ussd ( i , u ) :
phone_object = phonenumbers . parse ( u . tel )
phone = phonenumbers . format_number ( phone_object , phonenumbers . PhoneNumberFormat . E164 )
logg . debug ( ' tel {} {} ' . format ( u . tel , phone ) )
2021-06-26 17:51:25 +02:00
req = build_ussd_request (
phone ,
2021-06-29 00:40:54 +02:00
ussd_host ,
ussd_port ,
2021-06-26 17:51:25 +02:00
config . get ( ' APP_SERVICE_CODE ' ) ,
' ' ,
2021-06-29 00:40:54 +02:00
' ' ,
ussd_ssl
)
2021-04-09 15:00:15 +02:00
response = urllib . request . urlopen ( req )
response_data = response . read ( ) . decode ( ' utf-8 ' )
state = response_data [ : 3 ]
out = response_data [ 4 : ]
logg . debug ( ' ussd reponse: {} ' . format ( out ) )
2021-02-21 16:41:37 +01:00
2021-04-09 15:00:15 +02:00
if __name__ == ' __main__ ' :
2021-02-21 16:41:37 +01:00
i = 0
j = 0
for x in os . walk ( user_old_dir ) :
for y in x [ 2 ] :
2021-07-03 18:55:51 +02:00
if y [ len ( y ) - 5 : ] != ' .json ' :
2021-02-21 16:41:37 +01:00
continue
2021-05-15 09:40:34 +02:00
# handle json containing person object
2021-06-25 17:36:05 +02:00
filepath = os . path . join ( x [ 0 ] , y )
f = open ( filepath , ' r ' )
try :
o = json . load ( f )
except json . decoder . JSONDecodeError as e :
2021-02-21 16:41:37 +01:00
f . close ( )
2021-06-25 17:36:05 +02:00
logg . error ( ' load error for {} : {} ' . format ( y , e ) )
continue
f . close ( )
u = Person . deserialize ( o )
2021-07-03 18:55:51 +02:00
register_ussd ( i , u )
2021-06-25 17:36:05 +02:00
phone_object = phonenumbers . parse ( u . tel )
phone = phonenumbers . format_number ( phone_object , phonenumbers . PhoneNumberFormat . E164 )
s_phone = celery . signature (
2021-07-03 18:55:51 +02:00
' import_task.resolve_phone ' ,
[
phone ,
] ,
queue = ' cic-import-ussd ' ,
)
2021-06-25 17:36:05 +02:00
s_meta = celery . signature (
2021-07-03 18:55:51 +02:00
' import_task.generate_metadata ' ,
[
phone ,
] ,
queue = ' cic-import-ussd ' ,
)
2021-06-25 17:36:05 +02:00
s_balance = celery . signature (
2021-07-03 18:55:51 +02:00
' import_task.opening_balance_tx ' ,
[
phone ,
i ,
] ,
queue = ' cic-import-ussd ' ,
)
2021-06-25 17:36:05 +02:00
s_meta . link ( s_balance )
s_phone . link ( s_meta )
# block time plus a bit of time for ussd processing
s_phone . apply_async ( countdown = 7 )
i + = 1
sys . stdout . write ( ' imported {} {} ' . format ( i , u ) . ljust ( 200 ) + " \r " )
j + = 1
if j == batch_size :
time . sleep ( batch_delay )
j = 0