2021-02-21 16:41:37 +01:00
# standard imports
import os
import sys
import json
import logging
import argparse
import uuid
import datetime
import time
2021-04-09 15:00:15 +02:00
import urllib . request
2021-02-21 16:41:37 +01:00
from glob import glob
# third-party imports
import redis
import confini
import celery
from hexathon import (
add_0x ,
strip_0x ,
)
from chainlib . eth . address import to_checksum
from cic_types . models . person import Person
from cic_eth . api . api_task import Api
2021-02-22 21:00:18 +01:00
from chainlib . chain import ChainSpec
2021-02-21 16:41:37 +01:00
from cic_types . processor import generate_metadata_pointer
2021-04-09 15:00:15 +02:00
import phonenumbers
2021-02-21 16:41:37 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
default_config_dir = ' /usr/local/etc/cic '
argparser = argparse . ArgumentParser ( )
argparser . add_argument ( ' -c ' , type = str , default = default_config_dir , help = ' config file ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' Chain specification string ' )
argparser . add_argument ( ' --redis-host ' , dest = ' redis_host ' , type = str , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-port ' , dest = ' redis_port ' , type = int , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-db ' , dest = ' redis_db ' , type = int , help = ' redis db to use for task submission and callback ' )
2021-04-05 17:07:09 +02:00
argparser . add_argument ( ' --batch-size ' , dest = ' batch_size ' , default = 100 , type = int , help = ' burst size of sending transactions to node ' ) # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
2021-04-09 15:00:15 +02:00
argparser . add_argument ( ' --batch-delay ' , dest = ' batch_delay ' , default = 3 , type = int , help = ' seconds delay between batches ' )
2021-03-07 19:01:44 +01:00
argparser . add_argument ( ' --timeout ' , default = 60.0 , type = float , help = ' Callback timeout ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' Task queue ' )
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' Be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' Be more verbose ' )
argparser . add_argument ( ' user_dir ' , type = str , help = ' path to users export dir tree ' )
args = argparser . parse_args ( )
if args . v :
logg . setLevel ( logging . INFO )
elif args . vv :
logg . setLevel ( logging . DEBUG )
config_dir = args . c
config = confini . Config ( config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) )
config . process ( )
args_override = {
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
' REDIS_HOST ' : getattr ( args , ' redis_host ' ) ,
' REDIS_PORT ' : getattr ( args , ' redis_port ' ) ,
' REDIS_DB ' : getattr ( args , ' redis_db ' ) ,
}
config . dict_override ( args_override , ' cli ' )
celery_app = celery . Celery ( broker = config . get ( ' CELERY_BROKER_URL ' ) , backend = config . get ( ' CELERY_RESULT_URL ' ) )
redis_host = config . get ( ' REDIS_HOST ' )
redis_port = config . get ( ' REDIS_PORT ' )
redis_db = config . get ( ' REDIS_DB ' )
r = redis . Redis ( redis_host , redis_port , redis_db )
ps = r . pubsub ( )
user_new_dir = os . path . join ( args . user_dir , ' new ' )
os . makedirs ( user_new_dir )
meta_dir = os . path . join ( args . user_dir , ' meta ' )
os . makedirs ( meta_dir )
user_old_dir = os . path . join ( args . user_dir , ' old ' )
os . stat ( user_old_dir )
2021-04-09 15:00:15 +02:00
txs_dir = os . path . join ( args . user_dir , ' txs ' )
os . makedirs ( txs_dir )
2021-02-21 16:41:37 +01:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
chain_str = str ( chain_spec )
batch_size = args . batch_size
batch_delay = args . batch_delay
2021-05-03 10:36:19 +02:00
db_configs = {
' database ' : config . get ( ' DATABASE_NAME ' ) ,
' host ' : config . get ( ' DATABASE_HOST ' ) ,
' port ' : config . get ( ' DATABASE_PORT ' ) ,
' user ' : config . get ( ' DATABASE_USER ' ) ,
' password ' : config . get ( ' DATABASE_PASSWORD ' )
}
2021-04-09 15:00:15 +02:00
2021-02-21 16:41:37 +01:00
2021-04-09 15:00:15 +02:00
def build_ussd_request ( phone , host , port , service_code , username , password , ssl = False ) :
url = ' http '
if ssl :
url + = ' s '
url + = ' :// {} : {} ' . format ( host , port )
url + = ' /?username= {} &password= {} ' . format ( username , password ) #config.get('USSD_USER'), config.get('USSD_PASS'))
2021-02-21 16:41:37 +01:00
2021-04-09 15:00:15 +02:00
logg . info ( ' ussd service url {} ' . format ( url ) )
logg . info ( ' ussd phone {} ' . format ( phone ) )
session = uuid . uuid4 ( ) . hex
data = {
' sessionId ' : session ,
' serviceCode ' : service_code ,
' phoneNumber ' : phone ,
' text ' : service_code ,
}
req = urllib . request . Request ( url )
data_str = json . dumps ( data )
data_bytes = data_str . encode ( ' utf-8 ' )
req . add_header ( ' Content-Type ' , ' application/json ' )
req . data = data_bytes
return req
def register_ussd ( i , u ) :
phone_object = phonenumbers . parse ( u . tel )
phone = phonenumbers . format_number ( phone_object , phonenumbers . PhoneNumberFormat . E164 )
logg . debug ( ' tel {} {} ' . format ( u . tel , phone ) )
req = build_ussd_request ( phone , ' localhost ' , 63315 , ' *483*46# ' , ' ' , ' ' )
response = urllib . request . urlopen ( req )
response_data = response . read ( ) . decode ( ' utf-8 ' )
state = response_data [ : 3 ]
out = response_data [ 4 : ]
logg . debug ( ' ussd reponse: {} ' . format ( out ) )
2021-02-21 16:41:37 +01:00
2021-04-09 15:00:15 +02:00
if __name__ == ' __main__ ' :
2021-02-21 16:41:37 +01:00
i = 0
j = 0
for x in os . walk ( user_old_dir ) :
for y in x [ 2 ] :
if y [ len ( y ) - 5 : ] != ' .json ' :
continue
2021-05-03 10:36:19 +02:00
# handle json containing person object
filepath = None
if y != ' ussd_data.json ' :
filepath = os . path . join ( x [ 0 ] , y )
f = open ( filepath , ' r ' )
try :
o = json . load ( f )
except json . decoder . JSONDecodeError as e :
f . close ( )
logg . error ( ' load error for {} : {} ' . format ( y , e ) )
continue
2021-02-21 16:41:37 +01:00
f . close ( )
2021-05-03 10:36:19 +02:00
u = Person . deserialize ( o )
new_address = register_ussd ( i , u )
phone_object = phonenumbers . parse ( u . tel )
phone = phonenumbers . format_number ( phone_object , phonenumbers . PhoneNumberFormat . E164 )
s_phone = celery . signature (
' import_task.resolve_phone ' ,
[
phone ,
] ,
queue = ' cic-import-ussd ' ,
)
s_meta = celery . signature (
' import_task.generate_metadata ' ,
[
phone ,
] ,
queue = ' cic-import-ussd ' ,
)
s_balance = celery . signature (
' import_task.opening_balance_tx ' ,
[
phone ,
i ,
] ,
queue = ' cic-import-ussd ' ,
)
s_meta . link ( s_balance )
s_phone . link ( s_meta )
# block time plus a bit of time for ussd processing
s_phone . apply_async ( countdown = 7 )
i + = 1
sys . stdout . write ( ' imported {} {} ' . format ( i , u ) . ljust ( 200 ) + " \r " )
j + = 1
if j == batch_size :
time . sleep ( batch_delay )
j = 0