From 00643f4cea9872cf9e8ed1dd30f49f0641778694 Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 18 Feb 2021 00:20:26 +0100 Subject: [PATCH] Make import balance script run indepedently of import users commence --- apps/cic-eth/cic_eth/eth/tx.py | 2 +- .../cic_eth/runnable/daemons/dispatcher.py | 7 +- .../cic_eth/runnable/daemons/filters/gas.py | 10 +- .../runnable/daemons/filters/register.py | 6 +- .../cic_eth/runnable/daemons/filters/tx.py | 2 +- .../cic_eth/runnable/daemons/manager.py | 4 +- apps/cic-eth/cic_eth/sync/mined.py | 7 +- .../scripts/import_balance.py | 111 +++++++++--------- .../scripts/import_users.py | 41 ++++--- .../scripts/requirements.txt | 2 +- apps/contract-migration/scripts/verify.py | 57 ++++++--- 11 files changed, 149 insertions(+), 100 deletions(-) diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index c4a3fca6..745ed1d0 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -405,7 +405,7 @@ def refill_gas(self, recipient_address, chain_str): q = session.query(Otx.tx_hash) q = q.join(TxCache) q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0) - q = q.filter(TxCache.from_value!='0x00') + q = q.filter(TxCache.from_value!=0) q = q.filter(TxCache.recipient==recipient_address) c = q.count() session.close() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index dfcbdf59..9837585d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -91,6 +91,8 @@ run = True class DispatchSyncer: + yield_delay = 0.005 + def __init__(self, chain_spec): self.chain_spec = chain_spec self.chain_id = chain_spec.chain_id() @@ -138,7 +140,10 @@ class DispatchSyncer: txs[k] = utxs[k] self.process(w3, txs) - time.sleep(interval) + if len(utxs) > 0: + time.sleep(self.yield_delay) + else: + time.sleep(interval) def main(): diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index a3cf1d32..81e6242d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -5,9 +5,10 @@ import logging from cic_registry.chain import ChainSpec # local imports +from cic_eth.db.enum import StatusBits from cic_eth.db.models.base import SessionBase from cic_eth.db.models.tx import TxCache -from cic_eth.db import Otx +from cic_eth.db.models.otx import Otx from cic_eth.queue.tx import get_paused_txs from cic_eth.eth.task import create_check_gas_and_send_task from .base import SyncFilter @@ -17,8 +18,9 @@ logg = logging.getLogger() class GasFilter(SyncFilter): - def __init__(self, gas_provider): + def __init__(self, queue, gas_provider): self.gas_provider = gas_provider + self.queue = queue def filter(self, w3, tx, rcpt, chain_str): @@ -39,7 +41,7 @@ class GasFilter(SyncFilter): return chain_spec = ChainSpec.from_chain_str(chain_str) - txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) + txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id()) if len(txs) > 0: logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) @@ -49,6 +51,6 @@ class GasFilter(SyncFilter): r[0], 0, tx_hashes_hex=list(txs.keys()), - queue=queue, + queue=self.queue, ) s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 180cdca6..83d60b83 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -15,6 +15,10 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f class RegistrationFilter(SyncFilter): + def __init__(self, queue): + self.queue = queue + + def filter(self, w3, tx, rcpt, chain_spec): logg.debug('applying registration filter') registered_address = None @@ -30,6 +34,6 @@ class RegistrationFilter(SyncFilter): address, str(chain_spec), ], - queue=queue, + queue=self.queue, ) s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 6a645ff3..6e0dabd6 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -25,7 +25,7 @@ class TxFilter(SyncFilter): logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None logg.info('otx found {}'.format(otx.tx_hash)) - s = celery.siignature( + s = celery.signature( 'cic_eth.queue.tx.set_final_status', [ tx_hash_hex, diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py index bb85c466..fd281fff 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/manager.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -178,9 +178,9 @@ def main(): tx_filter = TxFilter(queue) - registration_filter = RegistrationFilter() + registration_filter = RegistrationFilter(queue) - gas_filter = GasFilter(c.gas_provider()) + gas_filter = GasFilter(queue, c.gas_provider()) i = 0 for syncer in syncers: diff --git a/apps/cic-eth/cic_eth/sync/mined.py b/apps/cic-eth/cic_eth/sync/mined.py index 616eea54..946f6cb1 100644 --- a/apps/cic-eth/cic_eth/sync/mined.py +++ b/apps/cic-eth/cic_eth/sync/mined.py @@ -23,6 +23,8 @@ class MinedSyncer(Syncer): :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend """ + yield_delay = 0.005 + def __init__(self, bc_cache): super(MinedSyncer, self).__init__(bc_cache) self.block_offset = 0 @@ -99,5 +101,8 @@ class MinedSyncer(Syncer): block_number = self.process(c.w3, block.hex()) logg.info('processed block {} {}'.format(block_number, block.hex())) self.bc_cache.disconnect() - time.sleep(interval) + if len(e) > 0: + time.sleep(self.yield_delay) + else: + time.sleep(interval) logg.info("Syncer no longer set to run, gracefully exiting") diff --git a/apps/contract-migration/scripts/import_balance.py b/apps/contract-migration/scripts/import_balance.py index 7c6ce3ef..ac5cd9b6 100644 --- a/apps/contract-migration/scripts/import_balance.py +++ b/apps/contract-migration/scripts/import_balance.py @@ -17,7 +17,6 @@ from hexathon import ( strip_0x, add_0x, ) -from cic_registry.chain import ChainSpec from chainsyncer.backend import MemBackend from chainsyncer.driver import HeadSyncer from chainlib.eth.connection import HTTPConnection @@ -34,6 +33,7 @@ from chainlib.eth.nonce import DefaultNonceOracle from chainlib.eth.tx import TxFactory from chainlib.eth.rpc import jsonrpc_template from chainlib.eth.error import EthException +from chainlib.chain import ChainSpec from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore import DictKeystore from cic_types.models.person import Person @@ -49,7 +49,6 @@ argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc p argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') -argparser.add_argument('-a', '--index-address', type=str, dest='a', help='account index contract address') argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') @@ -97,9 +96,8 @@ if args.head: block_offset = -1 else: block_offset = args.offset -account_index_address = args.a -chain_spec = ChainSpec.from_chain_str(chain_str) +chain_spec = ChainSpec.from_chain_str('evm:' + chain_str) user_dir = args.user_dir # user_out_dir from import_users.py @@ -108,45 +106,51 @@ class Handler: account_index_add_signature = keccak256_string_to_hex('add(address)')[:8] - def __init__(self, conn, user_dir, addresses, balances, token_address): - self.conn = conn + def __init__(self, conn, chain_spec, user_dir, balances, token_address, signer, gas_oracle, nonce_oracle): self.token_address = token_address self.user_dir = user_dir - self.addresses = addresses self.balances = balances + self.chain_spec = chain_spec + self.tx_factory = ERC20TxFactory(signer, gas_oracle, nonce_oracle, chain_spec.network_id()) def name(self): return 'balance_handler' - def handle(self, getter, block, tx): - try: - if tx.payload[:8] == self.account_index_add_signature: - recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:])) - original_address = to_checksum(self.addresses[to_checksum(recipient)]) - user_file = '{}/{}/{}.json'.format( - recipient[2:4].upper(), - recipient[4:6].upper(), - recipient[2:].upper(), - ) - filepath = os.path.join(self.user_dir, user_file) - f = open(filepath, 'r') - o = json.load(f) - f.close() - u = Person(o) - balance = self.balances[original_address] - logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance)) + def filter(self, conn, block, tx): + if tx.payload == None or len(tx.payload) == 0: + logg.debug('no payload, skipping {}'.format(tx)) + return - (tx_hash_hex, o) = getter.tx_factory.erc20_transfer(self.token_address, signer_address, recipient, balance) - logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient)) - r = self.conn.do(o) - except TypeError: - pass - except IndexError: - pass - except EthException as e: - logg.error('send error {}'.format(e).ljust(200)) + if tx.payload[:8] == self.account_index_add_signature: + recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:])) + #original_address = to_checksum(self.addresses[to_checksum(recipient)]) + user_file = 'new/{}/{}/{}.json'.format( + recipient[2:4].upper(), + recipient[4:6].upper(), + recipient[2:].upper(), + ) + filepath = os.path.join(self.user_dir, user_file) + f = open(filepath, 'r') + o = json.load(f) + f.close() + u = Person(o) + original_address = u.identities['evm']['xdai:1'][0] + balance = self.balances[original_address] + logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance)) + + (tx_hash_hex, o) = self.tx_factory.erc20_transfer(self.token_address, signer_address, recipient, balance) + logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient)) + r = conn.do(o) +# except TypeError as e: +# logg.warning('typerror {}'.format(e)) +# pass +# except IndexError as e: +# logg.warning('indexerror {}'.format(e)) +# pass +# except EthException as e: +# logg.error('send error {}'.format(e).ljust(200)) #except KeyError as e: # logg.error('key error {}'.format(e).ljust(200)) @@ -173,7 +177,7 @@ class BlockGetter: def progress_callback(s, block_number, tx_index): - sys.stdout.write(s.ljust(200) + "\r") + sys.stdout.write(s.ljust(200) + "\n") @@ -185,7 +189,7 @@ def main(): nonce_oracle = DefaultNonceOracle(signer_address, conn) # Get Token registry address - txf = TxFactory(signer=signer, gas_oracle=gas_oracle, nonce_oracle=None, chain_id=chain_spec.chain_id()) + txf = TxFactory(signer=signer, gas_oracle=gas_oracle, nonce_oracle=None, chain_id=chain_spec.network_id()) tx = txf.template(signer_address, config.get('CIC_REGISTRY_ADDRESS')) registry_addressof_method = keccak256_string_to_hex('addressOf(bytes32)')[:8] @@ -220,29 +224,28 @@ def main(): logg.info('found token address {}'.format(sarafu_token_address)) - getter = BlockGetter(conn, gas_oracle, nonce_oracle, chain_spec.chain_id()) syncer_backend = MemBackend(chain_str, 0) if block_offset == -1: o = block_latest() r = conn.do(o) block_offset = int(strip_0x(r), 16) + 1 - - addresses = {} - f = open('{}/addresses.csv'.format(user_dir, 'r')) - while True: - l = f.readline() - if l == None: - break - r = l.split(',') - try: - k = r[0] - v = r[1].rstrip() - addresses[k] = v - sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r") - except IndexError as e: - break - f.close() +# +# addresses = {} +# f = open('{}/addresses.csv'.format(user_dir, 'r')) +# while True: +# l = f.readline() +# if l == None: +# break +# r = l.split(',') +# try: +# k = r[0] +# v = r[1].rstrip() +# addresses[k] = v +# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r") +# except IndexError as e: +# break +# f.close() balances = {} f = open('{}/balances.csv'.format(user_dir, 'r')) @@ -266,9 +269,9 @@ def main(): syncer_backend.set(block_offset, 0) syncer = HeadSyncer(syncer_backend, progress_callback=progress_callback) - handler = Handler(conn, user_dir, addresses, balances, sarafu_token_address) + handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle) syncer.add_filter(handler) - syncer.loop(1, getter) + syncer.loop(1, conn) if __name__ == '__main__': diff --git a/apps/contract-migration/scripts/import_users.py b/apps/contract-migration/scripts/import_users.py index 5addde33..70b8feab 100644 --- a/apps/contract-migration/scripts/import_users.py +++ b/apps/contract-migration/scripts/import_users.py @@ -6,7 +6,7 @@ import logging import argparse import uuid import datetime -import shutil +import time from glob import glob # third-party imports @@ -35,6 +35,8 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') +argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node') +argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=20.0, type=float, help='Callback timeout') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-v', action='store_true', help='Be verbose') @@ -67,18 +69,16 @@ r = redis.Redis(redis_host, redis_port, redis_db) ps = r.pubsub() user_dir = args.user_dir -user_out_dir = '{}_cic_eth'.format(user_dir) -os.makedirs(user_out_dir) -shutil.copy( - os.path.join(user_dir, 'balances.csv'), - os.path.join(user_out_dir, 'balances.csv'), - ) +os.makedirs(os.path.join(user_dir, 'new')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_str = str(chain_spec) +batch_size = args.batch_size +batch_delay = args.batch_delay -def register_eth(u): + +def register_eth(i, u): redis_channel = str(uuid.uuid4()) ps.subscribe(redis_channel) ps.get_message() @@ -94,7 +94,7 @@ def register_eth(u): ps.get_message() m = ps.get_message(timeout=args.timeout) address = json.loads(m['data']) - logg.debug('register eth {} {}'.format(u, address)) + logg.debug('[{}] register eth {} {}'.format(i, u, address)) return address @@ -105,10 +105,13 @@ def register_ussd(u): if __name__ == '__main__': - fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a') + #fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a') i = 0 - for x in os.walk(user_dir): + j = 0 + user_new_dir = os.path.join(user_dir, 'new') + user_old_dir = os.path.join(user_dir, 'old') + for x in os.walk(user_old_dir): for y in x[2]: if y[len(y)-5:] != '.json': continue @@ -123,19 +126,18 @@ if __name__ == '__main__': f.close() u = Person(o) - new_address = register_eth(u) + new_address = register_eth(i, u) u.identities['evm'][chain_str] = [new_address] register_ussd(u) new_address_clean = strip_0x(new_address) filepath = os.path.join( - user_out_dir, + user_new_dir, new_address_clean[:2].upper(), new_address_clean[2:4].upper(), new_address_clean.upper() + '.json', ) - logg.debug('outpath {}'.format(filepath)) os.makedirs(os.path.dirname(filepath), exist_ok=True) o = u.serialize() @@ -143,10 +145,15 @@ if __name__ == '__main__': f.write(json.dumps(o)) f.close() - old_address = to_checksum(add_0x(y[:len(y)-5])) - fi.write('{},{}\n'.format(new_address, old_address)) + #old_address = to_checksum(add_0x(y[:len(y)-5])) + #fi.write('{},{}\n'.format(new_address, old_address)) i += 1 sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") + + j += 1 + if j == batch_size: + time.sleep(batch_delay) + j = 0 - fi.close() + #fi.close() diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index a1e49c90..9af41fcd 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,6 +1,6 @@ psycopg2==2.8.6 cic-types==0.1.0a4 -chainlib~=0.0.1a13 +chainlib~=0.0.1a14 chainsyncer==0.0.1a7 cic-eth==0.10.0a27 confini==0.3.6b2 diff --git a/apps/contract-migration/scripts/verify.py b/apps/contract-migration/scripts/verify.py index d1543c26..f860cc15 100644 --- a/apps/contract-migration/scripts/verify.py +++ b/apps/contract-migration/scripts/verify.py @@ -39,6 +39,7 @@ from chainlib.eth.error import EthException from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore import DictKeystore from cic_eth.api.api_admin import AdminApi +from cic_types.models.person import Person logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -160,6 +161,7 @@ def main(): o['params'].append(txf.normalize(tx)) o['params'].append('latest') r = conn.do(o) + print('r {}'.format(r)) token_index_address = to_checksum(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) logg.info('found token index address {}'.format(token_index_address)) @@ -193,21 +195,21 @@ def main(): sarafu_token_address = to_checksum(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) logg.info('found token address {}'.format(sarafu_token_address)) - addresses = {} - f = open('{}/addresses.csv'.format(user_dir, 'r')) - while True: - l = f.readline() - if l == None: - break - r = l.split(',') - try: - k = r[0] - v = r[1].rstrip() - addresses[k] = v - sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r") - except IndexError as e: - break - f.close() +# addresses = {} +# f = open('{}/addresses.csv'.format(user_dir, 'r')) +# while True: +# l = f.readline() +# if l == None: +# break +# r = l.split(',') +# try: +# k = r[0] +# v = r[1].rstrip() +# addresses[k] = v +# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r") +# except IndexError as e: +# break +# f.close() balances = {} f = open('{}/balances.csv'.format(user_dir, 'r')) @@ -232,8 +234,29 @@ def main(): api = AdminApi(MockClient()) verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address) - for k in addresses.keys(): - verifier.verify(k, balances[addresses[k]]) + + user_new_dir = os.path.join(user_dir, 'new') + for x in os.walk(user_new_dir): + for y in x[2]: + if y[len(y)-5:] != '.json': + continue + filepath = os.path.join(x[0], y) + f = open(filepath, 'r') + try: + o = json.load(f) + except json.decoder.JSONDecodeError as e: + f.close() + logg.error('load error for {}: {}'.format(y, e)) + continue + f.close() + u = Person(o) + + new_address = u.identities['evm'][chain_str][0] + old_address = u.identities['evm']['xdai:1'][0] + balance = balances[old_address] + logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance)) + + verifier.verify(new_address, balance) if __name__ == '__main__':