From 70704b09ecc0834febc7378fe03984bcde50d497 Mon Sep 17 00:00:00 2001 From: Philip Wafula Date: Sat, 3 Jul 2021 16:55:51 +0000 Subject: [PATCH] Improves import scripts logging --- apps/cic-eth/test_requirements.txt | 1 + .../runnable/daemons/cic_user_tasker.py | 2 + apps/data-seeding/cic_ussd/import_balance.py | 99 +++++++------ apps/data-seeding/cic_ussd/import_pins.py | 8 +- apps/data-seeding/cic_ussd/import_task.py | 135 +++++++++--------- apps/data-seeding/cic_ussd/import_users.py | 91 ++++++------ apps/data-seeding/cic_ussd/import_util.py | 38 +++-- 7 files changed, 202 insertions(+), 172 deletions(-) diff --git a/apps/cic-eth/test_requirements.txt b/apps/cic-eth/test_requirements.txt index c6546ad..bcdef04 100644 --- a/apps/cic-eth/test_requirements.txt +++ b/apps/cic-eth/test_requirements.txt @@ -4,3 +4,4 @@ pytest-mock==3.3.1 pytest-cov==2.10.1 eth-tester==0.5.0b3 py-evm==0.3.0a20 +eth-erc20==0.0.10a2 diff --git a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_tasker.py b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_tasker.py index 788f71b..69ff554 100644 --- a/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_tasker.py +++ b/apps/cic-ussd/cic_ussd/runnable/daemons/cic_user_tasker.py @@ -127,6 +127,8 @@ def main(): argv.append('--loglevel=INFO') argv.append('-Q') argv.append(args.q) + argv.append('-n') + argv.append(args.q) current_app.worker_main(argv) diff --git a/apps/data-seeding/cic_ussd/import_balance.py b/apps/data-seeding/cic_ussd/import_balance.py index 5643560..df5bdfe 100644 --- a/apps/data-seeding/cic_ussd/import_balance.py +++ b/apps/data-seeding/cic_ussd/import_balance.py @@ -1,28 +1,22 @@ # standard imports -import os -import sys -import logging import argparse -import hashlib -import redis -import celery +import logging +import sys +import os # external imports +import celery import confini -from chainlib.eth.connection import EthHTTPConnection +import redis from chainlib.chain import ChainSpec -from hexathon import ( - strip_0x, - add_0x, - ) from chainlib.eth.address import to_checksum_address +from chainlib.eth.connection import EthHTTPConnection from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore.dict import DictKeystore -from cic_types.models.person import Person # local imports -from import_util import BalanceProcessor -from import_task import * +from import_task import ImportTask, MetadataTask +from import_util import BalanceProcessor, get_celery_worker_status logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -41,37 +35,41 @@ argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') -argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions') +argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol', + help='Token symbol to use for transactions') argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') +argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, + help='environment prefix for variables to overwrite configuration') +argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to') argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from') argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true') argparser.add_argument('user_dir', default='out', type=str, help='user export directory') args = argparser.parse_args(sys.argv[1:]) -if args.v == True: +if args.v: logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: + +elif args.vv: logging.getLogger().setLevel(logging.DEBUG) config_dir = os.path.join(args.c) os.makedirs(config_dir, 0o777, True) config = confini.Config(config_dir, args.env_prefix) config.process() + # override args args_override = { - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - 'ETH_PROVIDER': getattr(args, 'p'), - 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), - 'REDIS_HOST': getattr(args, 'redis_host'), - 'REDIS_PORT': getattr(args, 'redis_port'), - 'REDIS_DB': getattr(args, 'redis_db'), - 'META_HOST': getattr(args, 'meta_host'), - 'META_PORT': getattr(args, 'meta_port'), - 'KEYSTORE_FILE_PATH': getattr(args, 'y') - } + 'CIC_CHAIN_SPEC': getattr(args, 'i'), + 'ETH_PROVIDER': getattr(args, 'p'), + 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), + 'REDIS_HOST': getattr(args, 'redis_host'), + 'REDIS_PORT': getattr(args, 'redis_port'), + 'REDIS_DB': getattr(args, 'redis_db'), + 'META_HOST': getattr(args, 'meta_host'), + 'META_PORT': getattr(args, 'meta_port'), + 'KEYSTORE_FILE_PATH': getattr(args, 'y') +} config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') @@ -81,14 +79,19 @@ redis_host = config.get('REDIS_HOST') redis_port = config.get('REDIS_PORT') redis_db = config.get('REDIS_DB') r = redis.Redis(redis_host, redis_port, redis_db) -celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) + +# create celery apps +celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) +status = get_celery_worker_status(celery_app=celery_app) signer_address = None keystore = DictKeystore() -if args.y != None: +if args.y is not None: logg.debug('loading keystore file {}'.format(args.y)) signer_address = keystore.import_keystore_file(args.y) logg.debug('now have key for signer address {}'.format(signer_address)) + +# define signer signer = EIP155Signer(keystore) queue = args.q @@ -103,7 +106,7 @@ chain_spec = ChainSpec.from_chain_str(chain_str) old_chain_spec_str = args.old_chain_spec old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str) -user_dir = args.user_dir # user_out_dir from import_users.py +user_dir = args.user_dir # user_out_dir from import_users.py token_symbol = args.token_symbol @@ -111,20 +114,22 @@ MetadataTask.meta_host = config.get('META_HOST') MetadataTask.meta_port = config.get('META_PORT') ImportTask.chain_spec = chain_spec + def main(): conn = EthHTTPConnection(config.get('ETH_PROVIDER')) - - ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer) + + ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), + signer_address, signer) ImportTask.balance_processor.init(token_symbol) # TODO get decimals from token balances = {} f = open('{}/balances.csv'.format(user_dir, 'r')) - remove_zeros = 10**6 + remove_zeros = 10 ** 6 i = 0 while True: l = f.readline() - if l == None: + if l is None: break r = l.split(',') try: @@ -143,15 +148,23 @@ def main(): ImportTask.import_dir = user_dir s = celery.signature( - 'import_task.send_txs', - [ - MetadataTask.balance_processor.nonce_offset, - ], - queue='cic-import-ussd', - ) + 'import_task.send_txs', + [ + MetadataTask.balance_processor.nonce_offset, + ], + queue=queue, + ) s.apply_async() - argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG'] + argv = ['worker'] + if args.vv: + argv.append('--loglevel=DEBUG') + elif args.v: + argv.append('--loglevel=INFO') + argv.append('-Q') + argv.append(args.q) + argv.append('-n') + argv.append(args.q) celery_app.worker_main(argv) diff --git a/apps/data-seeding/cic_ussd/import_pins.py b/apps/data-seeding/cic_ussd/import_pins.py index 119f8d0..827d384 100644 --- a/apps/data-seeding/cic_ussd/import_pins.py +++ b/apps/data-seeding/cic_ussd/import_pins.py @@ -9,7 +9,7 @@ import celery import confini # local imports -from import_task import * +from import_util import get_celery_worker_status logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -39,9 +39,12 @@ elif args.vv: config_dir = args.c config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) config.process() +config.censor('PASSWORD', 'DATABASE') logg.debug('config loaded from {}:\n{}'.format(args.c, config)) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) +status = get_celery_worker_status(celery_app=celery_app) + db_configs = { 'database': config.get('DATABASE_NAME'), @@ -61,7 +64,8 @@ def main(): (db_configs, phone_to_pins), queue=args.q ) - s_import_pins.apply_async() + result = s_import_pins.apply_async() + logg.debug(f'TASK: {result.id}, STATUS: {result.status}') if __name__ == '__main__': diff --git a/apps/data-seeding/cic_ussd/import_task.py b/apps/data-seeding/cic_ussd/import_task.py index c591768..2feb9cb 100644 --- a/apps/data-seeding/cic_ussd/import_task.py +++ b/apps/data-seeding/cic_ussd/import_task.py @@ -1,36 +1,33 @@ # standard imports -import os -import logging -import random -import urllib.parse -import urllib.error -import urllib.request import json +import logging +import os +import random +import urllib.error +import urllib.parse +import urllib.request # external imports import celery import psycopg2 -from psycopg2 import extras -from hexathon import ( - strip_0x, - add_0x, - ) from chainlib.eth.address import to_checksum_address from chainlib.eth.tx import ( - unpack, - raw, - ) -from cic_types.processor import generate_metadata_pointer + unpack, + raw, +) from cic_types.models.person import Person +from cic_types.processor import generate_metadata_pointer +from hexathon import ( + strip_0x, + add_0x, +) -#logg = logging.getLogger().getChild(__name__) logg = logging.getLogger() celery_app = celery.current_app class ImportTask(celery.Task): - balances = None import_dir = 'out' count = 0 @@ -38,16 +35,16 @@ class ImportTask(celery.Task): balance_processor = None max_retries = None -class MetadataTask(ImportTask): +class MetadataTask(ImportTask): meta_host = None meta_port = None meta_path = '' meta_ssl = False autoretry_for = ( - urllib.error.HTTPError, - OSError, - ) + urllib.error.HTTPError, + OSError, + ) retry_jitter = True retry_backoff = True retry_backoff_max = 60 @@ -64,12 +61,12 @@ class MetadataTask(ImportTask): def old_address_from_phone(base_path, phone): pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format( - base_path, - pidx[:2], - pidx[2:4], - pidx, - ) - ) + base_path, + pidx[:2], + pidx[2:4], + pidx, + ) + ) f = open(phone_idx_path, 'r') old_address = f.read() f.close() @@ -97,11 +94,11 @@ def generate_metadata(self, address, phone): logg.debug('address {}'.format(address)) old_address_upper = strip_0x(old_address).upper() metadata_path = '{}/old/{}/{}/{}.json'.format( - self.import_dir, - old_address_upper[:2], - old_address_upper[2:4], - old_address_upper, - ) + self.import_dir, + old_address_upper[:2], + old_address_upper[2:4], + old_address_upper, + ) f = open(metadata_path, 'r') o = json.load(f) @@ -116,12 +113,12 @@ def generate_metadata(self, address, phone): new_address_clean = strip_0x(address) filepath = os.path.join( - self.import_dir, - 'new', - new_address_clean[:2].upper(), - new_address_clean[2:4].upper(), - new_address_clean.upper() + '.json', - ) + self.import_dir, + 'new', + new_address_clean[:2].upper(), + new_address_clean[2:4].upper(), + new_address_clean.upper() + '.json', + ) os.makedirs(os.path.dirname(filepath), exist_ok=True) o = u.serialize() @@ -131,10 +128,10 @@ def generate_metadata(self, address, phone): meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person') meta_filepath = os.path.join( - self.import_dir, - 'meta', - '{}.json'.format(new_address_clean.upper()), - ) + self.import_dir, + 'meta', + '{}.json'.format(new_address_clean.upper()), + ) os.symlink(os.path.realpath(filepath), meta_filepath) # write ussd data @@ -180,10 +177,8 @@ def generate_metadata(self, address, phone): @celery_app.task(bind=True, base=MetadataTask) def opening_balance_tx(self, address, phone, serial): - - old_address = old_address_from_phone(self.import_dir, phone) - + k = to_checksum_address(strip_0x(old_address)) balance = self.balances[k] logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone)) @@ -196,39 +191,39 @@ def opening_balance_tx(self, address, phone, serial): logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) tx_path = os.path.join( - self.import_dir, - 'txs', - strip_0x(tx_hash_hex), - ) - + self.import_dir, + 'txs', + strip_0x(tx_hash_hex), + ) + f = open(tx_path, 'w') f.write(strip_0x(o)) f.close() tx_nonce_path = os.path.join( - self.import_dir, - 'txs', - '.' + str(tx['nonce']), - ) + self.import_dir, + 'txs', + '.' + str(tx['nonce']), + ) os.symlink(os.path.realpath(tx_path), tx_nonce_path) return tx['hash'] -@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, default_retry_delay=0.1) +@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, + default_retry_delay=0.1) def send_txs(self, nonce): - if nonce == self.count + self.balance_processor.nonce_offset: - logg.info('reached nonceĀ {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count)) + logg.info('reached nonceĀ {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, + self.count)) return - logg.debug('attempt to open symlink for nonce {}'.format(nonce)) tx_nonce_path = os.path.join( - self.import_dir, - 'txs', - '.' + str(nonce), - ) + self.import_dir, + 'txs', + '.' + str(nonce), + ) f = open(tx_nonce_path, 'r') tx_signed_raw_hex = f.read() f.close() @@ -238,21 +233,20 @@ def send_txs(self, nonce): o = raw(add_0x(tx_signed_raw_hex)) tx_hash_hex = self.balance_processor.conn.do(o) - logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex)) + logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex)) nonce += 1 queue = self.request.delivery_info.get('routing_key') s = celery.signature( - 'import_task.send_txs', - [ - nonce, - ], - queue=queue, - ) + 'import_task.send_txs', + [ + nonce, + ], + queue=queue, + ) s.apply_async() - return nonce @@ -310,4 +304,3 @@ def set_ussd_data(config: dict, ussd_data: dict): # close connections db_cursor.close() db_conn.close() - diff --git a/apps/data-seeding/cic_ussd/import_users.py b/apps/data-seeding/cic_ussd/import_users.py index e44b92f..224d090 100644 --- a/apps/data-seeding/cic_ussd/import_users.py +++ b/apps/data-seeding/cic_ussd/import_users.py @@ -17,6 +17,9 @@ import redis from chainlib.chain import ChainSpec from cic_types.models.person import Person +# local imports +from import_util import get_celery_worker_status + logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -28,11 +31,14 @@ argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain spe argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission') argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') -argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size +argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, + help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') -argparser.add_argument('--ussd-host', dest='ussd_host', type=str, help="host to ussd app responsible for processing ussd requests.") -argparser.add_argument('--ussd-port', dest='ussd_port', type=str, help="port to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-host', dest='ussd_host', type=str, + help="host to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-port', dest='ussd_port', type=str, + help="port to ussd app responsible for processing ussd requests.") argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-v', action='store_true', help='Be verbose') @@ -49,13 +55,16 @@ config_dir = args.c config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) config.process() args_override = { - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - 'REDIS_HOST': getattr(args, 'redis_host'), - 'REDIS_PORT': getattr(args, 'redis_port'), - 'REDIS_DB': getattr(args, 'redis_db'), - } + 'CIC_CHAIN_SPEC': getattr(args, 'i'), + 'REDIS_HOST': getattr(args, 'redis_host'), + 'REDIS_PORT': getattr(args, 'redis_port'), + 'REDIS_DB': getattr(args, 'redis_db'), +} config.dict_override(args_override, 'cli') +logg.debug('config loaded from {}:\n{}'.format(args.c, config)) + celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) +get_celery_worker_status(celery_app=celery_app) redis_host = config.get('REDIS_HOST') redis_port = config.get('REDIS_PORT') @@ -65,22 +74,22 @@ r = redis.Redis(redis_host, redis_port, redis_db) ps = r.pubsub() user_new_dir = os.path.join(args.user_dir, 'new') -os.makedirs(user_new_dir) +os.makedirs(user_new_dir, exist_ok=True) ussd_data_dir = os.path.join(args.user_dir, 'ussd') -os.makedirs(ussd_data_dir) +os.makedirs(ussd_data_dir, exist_ok=True) preferences_dir = os.path.join(args.user_dir, 'preferences') -os.makedirs(os.path.join(preferences_dir, 'meta')) +os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True) meta_dir = os.path.join(args.user_dir, 'meta') -os.makedirs(meta_dir) +os.makedirs(meta_dir, exist_ok=True) user_old_dir = os.path.join(args.user_dir, 'old') os.stat(user_old_dir) txs_dir = os.path.join(args.user_dir, 'txs') -os.makedirs(txs_dir) +os.makedirs(txs_dir, exist_ok=True) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_str = str(chain_spec) @@ -95,6 +104,7 @@ if ussd_no_ssl is True: else: ussd_ssl = True + def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): url = 'http' if ssl: @@ -109,13 +119,13 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl= session = uuid.uuid4().hex data = { - 'sessionId': session, - 'serviceCode': service_code, - 'phoneNumber': phone, - 'text': service_code, - } + 'sessionId': session, + 'serviceCode': service_code, + 'phoneNumber': phone, + 'text': service_code, + } req = urllib.request.Request(url) - req.method=('POST') + req.method = 'POST' data_str = urlencode(data) data_bytes = data_str.encode('utf-8') req.add_header('Content-Type', 'application/x-www-form-urlencoded') @@ -150,7 +160,7 @@ if __name__ == '__main__': j = 0 for x in os.walk(user_old_dir): for y in x[2]: - if y[len(y)-5:] != '.json': + if y[len(y) - 5:] != '.json': continue # handle json containing person object filepath = os.path.join(x[0], y) @@ -164,35 +174,35 @@ if __name__ == '__main__': f.close() u = Person.deserialize(o) - new_address = register_ussd(i, u) + register_ussd(i, u) phone_object = phonenumbers.parse(u.tel) phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) s_phone = celery.signature( - 'import_task.resolve_phone', - [ - phone, - ], - queue='cic-import-ussd', - ) + 'import_task.resolve_phone', + [ + phone, + ], + queue='cic-import-ussd', + ) s_meta = celery.signature( - 'import_task.generate_metadata', - [ - phone, - ], - queue='cic-import-ussd', - ) + 'import_task.generate_metadata', + [ + phone, + ], + queue='cic-import-ussd', + ) s_balance = celery.signature( - 'import_task.opening_balance_tx', - [ - phone, - i, - ], - queue='cic-import-ussd', - ) + 'import_task.opening_balance_tx', + [ + phone, + i, + ], + queue='cic-import-ussd', + ) s_meta.link(s_balance) s_phone.link(s_meta) @@ -206,4 +216,3 @@ if __name__ == '__main__': if j == batch_size: time.sleep(batch_delay) j = 0 - diff --git a/apps/data-seeding/cic_ussd/import_util.py b/apps/data-seeding/cic_ussd/import_util.py index d88b354..563636d 100644 --- a/apps/data-seeding/cic_ussd/import_util.py +++ b/apps/data-seeding/cic_ussd/import_util.py @@ -2,15 +2,16 @@ import logging # external imports -from eth_contract_registry import Registry -from eth_token_index import TokenUniqueSymbolIndex from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.nonce import OverrideNonceOracle -from eth_erc20 import ERC20 from chainlib.eth.tx import ( - count, - TxFormat, - ) + count, + TxFormat, +) +from celery import Celery +from eth_contract_registry import Registry +from eth_erc20 import ERC20 +from eth_token_index import TokenUniqueSymbolIndex logg = logging.getLogger().getChild(__name__) @@ -18,10 +19,9 @@ logg = logging.getLogger().getChild(__name__) class BalanceProcessor: def __init__(self, conn, chain_spec, registry_address, signer_address, signer): - self.chain_spec = chain_spec self.conn = conn - #self.signer_address = signer_address + # self.signer_address = signer_address self.registry_address = registry_address self.token_index_address = None @@ -35,7 +35,6 @@ class BalanceProcessor: self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000) self.value_multiplier = 1 - def init(self, token_symbol): # Get Token registry address @@ -57,16 +56,25 @@ class BalanceProcessor: n = tx_factory.parse_decimals(r) self.value_multiplier = 10 ** n - def get_rpc_tx(self, recipient, value, i): logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient)) nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i) tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle) - return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED) - #(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value) - #self.conn.do(o) - #return tx_hash_hex - + return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, + tx_format=TxFormat.RLP_SIGNED) + # (tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value) + # self.conn.do(o) + # return tx_hash_hex def get_decimal_amount(self, value): return value * self.value_multiplier + + +def get_celery_worker_status(celery_app: Celery): + inspector = celery_app.control.inspect() + availability = inspector.ping() + status = { + 'availability': availability, + } + logg.debug(f'RUNNING WITH STATUS: {status}') + return status