Merge remote-tracking branch 'origin/master' into lash/free-api

This commit is contained in:
nolash 2021-07-05 12:25:05 +02:00
commit 54b857d286
8 changed files with 204 additions and 175 deletions

View File

@ -6,5 +6,4 @@ pytest-redis==2.0.0
redis==3.5.3 redis==3.5.3
eth-tester==0.5.0b3 eth-tester==0.5.0b3
py-evm==0.3.0a20 py-evm==0.3.0a20
eth-erc20~=0.0.10a2 eth-erc20~=0.0.10a2

View File

@ -127,6 +127,8 @@ def main():
argv.append('--loglevel=INFO') argv.append('--loglevel=INFO')
argv.append('-Q') argv.append('-Q')
argv.append(args.q) argv.append(args.q)
argv.append('-n')
argv.append(args.q)
current_app.worker_main(argv) current_app.worker_main(argv)

View File

@ -10,7 +10,7 @@ import redis
import celery import celery
from cic_eth_registry.registry import CICRegistry from cic_eth_registry.registry import CICRegistry
from chainsyncer.backend.memory import MemBackend from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer from chainsyncer.driver.head import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.gas import RPCGasOracle from chainlib.eth.gas import RPCGasOracle
@ -24,6 +24,7 @@ from cic_base import (
rpc, rpc,
signer as signer_funcs, signer as signer_funcs,
) )
from cic_base.eth.syncer import chain_interface
# local imports # local imports
#import common #import common
@ -120,7 +121,7 @@ def main():
'api_queue': config.get('_CELERY_QUEUE'), 'api_queue': config.get('_CELERY_QUEUE'),
} }
syncer = HeadSyncer(syncer_backend, block_callback=handler.refresh) syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=handler.refresh)
syncer.add_filter(handler) syncer.add_filter(handler)
syncer.loop(1, conn) syncer.loop(1, conn)

View File

@ -1,28 +1,22 @@
# standard imports # standard imports
import os
import sys
import logging
import argparse import argparse
import hashlib import logging
import redis import sys
import celery import os
# external imports # external imports
import celery
import confini import confini
from chainlib.eth.connection import EthHTTPConnection import redis
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person
# local imports # local imports
from import_util import BalanceProcessor from import_task import ImportTask, MetadataTask
from import_task import * from import_util import BalanceProcessor, get_celery_worker_status
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -41,37 +35,41 @@ argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions') argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)') argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str,
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from') argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory') argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:]) args = argparser.parse_args(sys.argv[1:])
if args.v == True: if args.v:
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
elif args.vv:
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c) config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True) os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix) config = confini.Config(config_dir, args.env_prefix)
config.process() config.process()
# override args # override args
args_override = { args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'), 'ETH_PROVIDER': getattr(args, 'p'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'REDIS_HOST': getattr(args, 'redis_host'), 'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'), 'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'), 'REDIS_DB': getattr(args, 'redis_db'),
'META_HOST': getattr(args, 'meta_host'), 'META_HOST': getattr(args, 'meta_host'),
'META_PORT': getattr(args, 'meta_port'), 'META_PORT': getattr(args, 'meta_port'),
'KEYSTORE_FILE_PATH': getattr(args, 'y') 'KEYSTORE_FILE_PATH': getattr(args, 'y')
} }
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
@ -81,14 +79,19 @@ redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT') redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB') redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db) r = redis.Redis(redis_host, redis_port, redis_db)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
# create celery apps
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
status = get_celery_worker_status(celery_app=celery_app)
signer_address = None signer_address = None
keystore = DictKeystore() keystore = DictKeystore()
if args.y != None: if args.y is not None:
logg.debug('loading keystore file {}'.format(args.y)) logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y) signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address)) logg.debug('now have key for signer address {}'.format(signer_address))
# define signer
signer = EIP155Signer(keystore) signer = EIP155Signer(keystore)
queue = args.q queue = args.q
@ -103,7 +106,7 @@ chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str) old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol token_symbol = args.token_symbol
@ -111,20 +114,22 @@ MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT') MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec ImportTask.chain_spec = chain_spec
def main(): def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER')) conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer) ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'),
signer_address, signer)
ImportTask.balance_processor.init(token_symbol) ImportTask.balance_processor.init(token_symbol)
# TODO get decimals from token # TODO get decimals from token
balances = {} balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r')) f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6 remove_zeros = 10 ** 6
i = 0 i = 0
while True: while True:
l = f.readline() l = f.readline()
if l == None: if l is None:
break break
r = l.split(',') r = l.split(',')
try: try:
@ -143,15 +148,23 @@ def main():
ImportTask.import_dir = user_dir ImportTask.import_dir = user_dir
s = celery.signature( s = celery.signature(
'import_task.send_txs', 'import_task.send_txs',
[ [
MetadataTask.balance_processor.nonce_offset, MetadataTask.balance_processor.nonce_offset,
], ],
queue='cic-import-ussd', queue=queue,
) )
s.apply_async() s.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG'] argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q')
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
celery_app.worker_main(argv) celery_app.worker_main(argv)

View File

@ -9,7 +9,7 @@ import celery
import confini import confini
# local imports # local imports
from import_task import * from import_util import get_celery_worker_status
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -39,9 +39,12 @@ elif args.vv:
config_dir = args.c config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process() config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) logg.debug('config loaded from {}:\n{}'.format(args.c, config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
status = get_celery_worker_status(celery_app=celery_app)
db_configs = { db_configs = {
'database': config.get('DATABASE_NAME'), 'database': config.get('DATABASE_NAME'),
@ -61,7 +64,8 @@ def main():
(db_configs, phone_to_pins), (db_configs, phone_to_pins),
queue=args.q queue=args.q
) )
s_import_pins.apply_async() result = s_import_pins.apply_async()
logg.debug(f'TASK: {result.id}, STATUS: {result.status}')
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,36 +1,33 @@
# standard imports # standard imports
import os
import logging
import random
import urllib.parse
import urllib.error
import urllib.request
import json import json
import logging
import os
import random
import urllib.error
import urllib.parse
import urllib.request
# external imports # external imports
import celery import celery
import psycopg2 import psycopg2
from psycopg2 import extras
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import ( from chainlib.eth.tx import (
unpack, unpack,
raw, raw,
) )
from cic_types.processor import generate_metadata_pointer
from cic_types.models.person import Person from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
from hexathon import (
strip_0x,
add_0x,
)
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger() logg = logging.getLogger()
celery_app = celery.current_app celery_app = celery.current_app
class ImportTask(celery.Task): class ImportTask(celery.Task):
balances = None balances = None
import_dir = 'out' import_dir = 'out'
count = 0 count = 0
@ -38,16 +35,16 @@ class ImportTask(celery.Task):
balance_processor = None balance_processor = None
max_retries = None max_retries = None
class MetadataTask(ImportTask):
class MetadataTask(ImportTask):
meta_host = None meta_host = None
meta_port = None meta_port = None
meta_path = '' meta_path = ''
meta_ssl = False meta_ssl = False
autoretry_for = ( autoretry_for = (
urllib.error.HTTPError, urllib.error.HTTPError,
OSError, OSError,
) )
retry_jitter = True retry_jitter = True
retry_backoff = True retry_backoff = True
retry_backoff_max = 60 retry_backoff_max = 60
@ -64,12 +61,12 @@ class MetadataTask(ImportTask):
def old_address_from_phone(base_path, phone): def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format( phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path, base_path,
pidx[:2], pidx[:2],
pidx[2:4], pidx[2:4],
pidx, pidx,
) )
) )
f = open(phone_idx_path, 'r') f = open(phone_idx_path, 'r')
old_address = f.read() old_address = f.read()
f.close() f.close()
@ -97,11 +94,11 @@ def generate_metadata(self, address, phone):
logg.debug('address {}'.format(address)) logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper() old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format( metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir, self.import_dir,
old_address_upper[:2], old_address_upper[:2],
old_address_upper[2:4], old_address_upper[2:4],
old_address_upper, old_address_upper,
) )
f = open(metadata_path, 'r') f = open(metadata_path, 'r')
o = json.load(f) o = json.load(f)
@ -116,12 +113,12 @@ def generate_metadata(self, address, phone):
new_address_clean = strip_0x(address) new_address_clean = strip_0x(address)
filepath = os.path.join( filepath = os.path.join(
self.import_dir, self.import_dir,
'new', 'new',
new_address_clean[:2].upper(), new_address_clean[:2].upper(),
new_address_clean[2:4].upper(), new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json', new_address_clean.upper() + '.json',
) )
os.makedirs(os.path.dirname(filepath), exist_ok=True) os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize() o = u.serialize()
@ -131,10 +128,10 @@ def generate_metadata(self, address, phone):
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person') meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_filepath = os.path.join( meta_filepath = os.path.join(
self.import_dir, self.import_dir,
'meta', 'meta',
'{}.json'.format(new_address_clean.upper()), '{}.json'.format(new_address_clean.upper()),
) )
os.symlink(os.path.realpath(filepath), meta_filepath) os.symlink(os.path.realpath(filepath), meta_filepath)
# write ussd data # write ussd data
@ -180,10 +177,8 @@ def generate_metadata(self, address, phone):
@celery_app.task(bind=True, base=MetadataTask) @celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial): def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_dir, phone) old_address = old_address_from_phone(self.import_dir, phone)
k = to_checksum_address(strip_0x(old_address)) k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k] balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone)) logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
@ -196,39 +191,39 @@ def opening_balance_tx(self, address, phone, serial):
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join( tx_path = os.path.join(
self.import_dir, self.import_dir,
'txs', 'txs',
strip_0x(tx_hash_hex), strip_0x(tx_hash_hex),
) )
f = open(tx_path, 'w') f = open(tx_path, 'w')
f.write(strip_0x(o)) f.write(strip_0x(o))
f.close() f.close()
tx_nonce_path = os.path.join( tx_nonce_path = os.path.join(
self.import_dir, self.import_dir,
'txs', 'txs',
'.' + str(tx['nonce']), '.' + str(tx['nonce']),
) )
os.symlink(os.path.realpath(tx_path), tx_nonce_path) os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash'] return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, default_retry_delay=0.1) @celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None,
default_retry_delay=0.1)
def send_txs(self, nonce): def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset: if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count)) logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset,
self.count))
return return
logg.debug('attempt to open symlink for nonce {}'.format(nonce)) logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join( tx_nonce_path = os.path.join(
self.import_dir, self.import_dir,
'txs', 'txs',
'.' + str(nonce), '.' + str(nonce),
) )
f = open(tx_nonce_path, 'r') f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read() tx_signed_raw_hex = f.read()
f.close() f.close()
@ -238,21 +233,20 @@ def send_txs(self, nonce):
o = raw(add_0x(tx_signed_raw_hex)) o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o) tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex)) logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex))
nonce += 1 nonce += 1
queue = self.request.delivery_info.get('routing_key') queue = self.request.delivery_info.get('routing_key')
s = celery.signature( s = celery.signature(
'import_task.send_txs', 'import_task.send_txs',
[ [
nonce, nonce,
], ],
queue=queue, queue=queue,
) )
s.apply_async() s.apply_async()
return nonce return nonce
@ -310,4 +304,3 @@ def set_ussd_data(config: dict, ussd_data: dict):
# close connections # close connections
db_cursor.close() db_cursor.close()
db_conn.close() db_conn.close()

View File

@ -17,6 +17,9 @@ import redis
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from cic_types.models.person import Person from cic_types.models.person import Person
# local imports
from import_util import get_celery_worker_status
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -28,11 +31,14 @@ argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain spe
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int,
help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('--ussd-host', dest='ussd_host', type=str, help="host to ussd app responsible for processing ussd requests.") argparser.add_argument('--ussd-host', dest='ussd_host', type=str,
argparser.add_argument('--ussd-port', dest='ussd_port', type=str, help="port to ussd app responsible for processing ussd requests.") help="host to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-v', action='store_true', help='Be verbose')
@ -49,13 +55,16 @@ config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process() config.process()
args_override = { args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
'REDIS_HOST': getattr(args, 'redis_host'), 'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'), 'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'), 'REDIS_DB': getattr(args, 'redis_db'),
} }
config.dict_override(args_override, 'cli') config.dict_override(args_override, 'cli')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app=celery_app)
redis_host = config.get('REDIS_HOST') redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT') redis_port = config.get('REDIS_PORT')
@ -65,22 +74,22 @@ r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub() ps = r.pubsub()
user_new_dir = os.path.join(args.user_dir, 'new') user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir) os.makedirs(user_new_dir, exist_ok=True)
ussd_data_dir = os.path.join(args.user_dir, 'ussd') ussd_data_dir = os.path.join(args.user_dir, 'ussd')
os.makedirs(ussd_data_dir) os.makedirs(ussd_data_dir, exist_ok=True)
preferences_dir = os.path.join(args.user_dir, 'preferences') preferences_dir = os.path.join(args.user_dir, 'preferences')
os.makedirs(os.path.join(preferences_dir, 'meta')) os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True)
meta_dir = os.path.join(args.user_dir, 'meta') meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir) os.makedirs(meta_dir, exist_ok=True)
user_old_dir = os.path.join(args.user_dir, 'old') user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir) os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs') txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir) os.makedirs(txs_dir, exist_ok=True)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec) chain_str = str(chain_spec)
@ -95,6 +104,7 @@ if ussd_no_ssl is True:
else: else:
ussd_ssl = True ussd_ssl = True
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
url = 'http' url = 'http'
if ssl: if ssl:
@ -109,13 +119,13 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
session = uuid.uuid4().hex session = uuid.uuid4().hex
data = { data = {
'sessionId': session, 'sessionId': session,
'serviceCode': service_code, 'serviceCode': service_code,
'phoneNumber': phone, 'phoneNumber': phone,
'text': service_code, 'text': service_code,
} }
req = urllib.request.Request(url) req = urllib.request.Request(url)
req.method=('POST') req.method = 'POST'
data_str = urlencode(data) data_str = urlencode(data)
data_bytes = data_str.encode('utf-8') data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/x-www-form-urlencoded') req.add_header('Content-Type', 'application/x-www-form-urlencoded')
@ -150,7 +160,7 @@ if __name__ == '__main__':
j = 0 j = 0
for x in os.walk(user_old_dir): for x in os.walk(user_old_dir):
for y in x[2]: for y in x[2]:
if y[len(y)-5:] != '.json': if y[len(y) - 5:] != '.json':
continue continue
# handle json containing person object # handle json containing person object
filepath = os.path.join(x[0], y) filepath = os.path.join(x[0], y)
@ -164,35 +174,35 @@ if __name__ == '__main__':
f.close() f.close()
u = Person.deserialize(o) u = Person.deserialize(o)
new_address = register_ussd(i, u) register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel) phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature( s_phone = celery.signature(
'import_task.resolve_phone', 'import_task.resolve_phone',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta = celery.signature( s_meta = celery.signature(
'import_task.generate_metadata', 'import_task.generate_metadata',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_balance = celery.signature( s_balance = celery.signature(
'import_task.opening_balance_tx', 'import_task.opening_balance_tx',
[ [
phone, phone,
i, i,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta.link(s_balance) s_meta.link(s_balance)
s_phone.link(s_meta) s_phone.link(s_meta)
@ -206,4 +216,3 @@ if __name__ == '__main__':
if j == batch_size: if j == batch_size:
time.sleep(batch_delay) time.sleep(batch_delay)
j = 0 j = 0

View File

@ -2,15 +2,16 @@
import logging import logging
# external imports # external imports
from eth_contract_registry import Registry
from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import OverrideNonceOracle from chainlib.eth.nonce import OverrideNonceOracle
from eth_erc20 import ERC20
from chainlib.eth.tx import ( from chainlib.eth.tx import (
count, count,
TxFormat, TxFormat,
) )
from celery import Celery
from eth_contract_registry import Registry
from eth_erc20 import ERC20
from eth_token_index import TokenUniqueSymbolIndex
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger().getChild(__name__)
@ -18,10 +19,9 @@ logg = logging.getLogger().getChild(__name__)
class BalanceProcessor: class BalanceProcessor:
def __init__(self, conn, chain_spec, registry_address, signer_address, signer): def __init__(self, conn, chain_spec, registry_address, signer_address, signer):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.conn = conn self.conn = conn
#self.signer_address = signer_address # self.signer_address = signer_address
self.registry_address = registry_address self.registry_address = registry_address
self.token_index_address = None self.token_index_address = None
@ -35,7 +35,6 @@ class BalanceProcessor:
self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000) self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
self.value_multiplier = 1 self.value_multiplier = 1
def init(self, token_symbol): def init(self, token_symbol):
# Get Token registry address # Get Token registry address
@ -57,16 +56,25 @@ class BalanceProcessor:
n = tx_factory.parse_decimals(r) n = tx_factory.parse_decimals(r)
self.value_multiplier = 10 ** n self.value_multiplier = 10 ** n
def get_rpc_tx(self, recipient, value, i): def get_rpc_tx(self, recipient, value, i):
logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient)) logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient))
nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i) nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i)
tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle) tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle)
return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED) return tx_factory.transfer(self.token_address, self.signer_address, recipient, value,
#(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value) tx_format=TxFormat.RLP_SIGNED)
#self.conn.do(o) # (tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
#return tx_hash_hex # self.conn.do(o)
# return tx_hash_hex
def get_decimal_amount(self, value): def get_decimal_amount(self, value):
return value * self.value_multiplier return value * self.value_multiplier
def get_celery_worker_status(celery_app: Celery):
inspector = celery_app.control.inspect()
availability = inspector.ping()
status = {
'availability': availability,
}
logg.debug(f'RUNNING WITH STATUS: {status}')
return status