Rehabilitate ussd import scripts

This commit is contained in:
Louis Holbrook
2021-06-23 04:29:38 +00:00
committed by Philip Wafula
parent e5b1352970
commit 12ab5c2f66
11 changed files with 46 additions and 65 deletions

View File

@@ -114,7 +114,7 @@ def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer)
ImportTask.balance_processor.init()
ImportTask.balance_processor.init(token_symbol)
# TODO get decimals from token
balances = {}
@@ -139,6 +139,7 @@ def main():
ImportTask.balances = balances
ImportTask.count = i
ImportTask.import_dir = user_dir
s = celery.signature(
'import_task.send_txs',

View File

@@ -39,6 +39,7 @@ elif args.vv:
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -62,9 +63,6 @@ def main():
)
s_import_pins.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -1,29 +1,21 @@
# standard imports
import os
import sys
import argparse
import json
import logging
import argparse
import uuid
import datetime
import os
import sys
import time
import urllib.request
from glob import glob
import uuid
from urllib.parse import urlencode
# third-party imports
import redis
import confini
# external imports
import celery
from hexathon import (
add_0x,
strip_0x,
)
from chainlib.eth.address import to_checksum
from cic_types.models.person import Person
from cic_eth.api.api_task import Api
from chainlib.chain import ChainSpec
from cic_types.processor import generate_metadata_pointer
import confini
import phonenumbers
import redis
from chainlib.chain import ChainSpec
from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -87,21 +79,13 @@ chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
url = 'http'
if ssl:
url += 's'
url += '://{}:{}'.format(host, port)
url += '/?username={}&password={}'.format(username, password) #config.get('USSD_USER'), config.get('USSD_PASS'))
url += '/?username={}&password={}'.format(username, password)
logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone))
@@ -114,9 +98,10 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
'text': service_code,
}
req = urllib.request.Request(url)
data_str = json.dumps(data)
req.method=('POST')
data_str = urlencode(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.add_header('Content-Type', 'application/x-www-form-urlencoded')
req.data = data_bytes
return req

View File

@@ -31,9 +31,9 @@ elif args.vv:
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
db_configs = {
'database': config.get('DATABASE_NAME'),
@@ -45,18 +45,15 @@ db_configs = {
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for x in os.walk(ussd_data_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle ussd_data json object
if y[:15] == '_ussd_data.json':
if y[len(y) - 5:] == '.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
logg.debug(f'LOADING USSD DATA: {ussd_data}')
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))

View File

@@ -6,7 +6,7 @@ from eth_contract_registry import Registry
from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.erc20 import ERC20
from eth_erc20 import ERC20
from chainlib.eth.tx import (
count,
TxFormat,
@@ -37,7 +37,7 @@ class BalanceProcessor:
self.value_multiplier = 1
def init(self):
def init(self, token_symbol):
# Get Token registry address
registry = Registry(self.chain_spec)
o = registry.address_of(self.registry_address, 'TokenRegistry')
@@ -46,10 +46,10 @@ class BalanceProcessor:
logg.info('found token index address {}'.format(self.token_index_address))
token_registry = TokenUniqueSymbolIndex(self.chain_spec)
o = token_registry.address_of(self.token_index_address, 'SRF')
o = token_registry.address_of(self.token_index_address, token_symbol)
r = self.conn.do(o)
self.token_address = token_registry.parse_address_of(r)
logg.info('found SRF token address {}'.format(self.token_address))
logg.info('found {} token address {}'.format(token_symbol, self.token_address))
tx_factory = ERC20(self.chain_spec)
o = tx_factory.decimals(self.token_address)