From 462933d8ae4299aac04636dd3bf200de67f40ca9 Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Thu, 1 Apr 2021 20:55:39 +0000 Subject: [PATCH 1/2] Remove custodial dependencies from sovereign import scripts --- .../cic_eth/runnable/daemons/dispatcher.py | 2 +- .../scripts/create_import_users.py | 4 - .../scripts/import_sovereign_users.py | 10 +- apps/contract-migration/scripts/verify.py | 117 +++++++++++++----- docker-compose.yml | 2 +- 5 files changed, 91 insertions(+), 44 deletions(-) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 1465b3b7..9f4ef332 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -88,7 +88,7 @@ run = True class DispatchSyncer: - yield_delay = 0.005 + yield_delay = 0.0005 def __init__(self, chain_spec): self.chain_spec = chain_spec diff --git a/apps/contract-migration/scripts/create_import_users.py b/apps/contract-migration/scripts/create_import_users.py index d8587ee0..e46d9e48 100644 --- a/apps/contract-migration/scripts/create_import_users.py +++ b/apps/contract-migration/scripts/create_import_users.py @@ -17,9 +17,7 @@ import random import vobject import celery from faker import Faker -import cic_eth_registry import confini -from cic_eth.api import Api from cic_types.models.person import ( Person, generate_vcard_from_contact_data, @@ -62,8 +60,6 @@ ts_then = int(dt_then.timestamp()) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -api = Api(config.get('CIC_CHAIN_SPEC')) - gift_max = args.gift_threshold or 0 gift_factor = (10**6) diff --git a/apps/contract-migration/scripts/import_sovereign_users.py b/apps/contract-migration/scripts/import_sovereign_users.py index 39f4143a..30e3f5db 100644 --- a/apps/contract-migration/scripts/import_sovereign_users.py +++ b/apps/contract-migration/scripts/import_sovereign_users.py @@ -23,7 +23,7 @@ from chainlib.eth.gas import RPCGasOracle from chainlib.eth.nonce import RPCNonceOracle from cic_types.processor import generate_metadata_pointer from eth_accounts_index import AccountRegistry -from cic_eth_registry import CICRegistry +from contract_registry import Registry from crypto_dev_signer.keystore.dict import DictKeystore from crypto_dev_signer.eth.signer.defaultsigner import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore.keyfile import to_dict as to_keyfile_dict @@ -88,9 +88,11 @@ signer = EIP155Signer(keystore) nonce_oracle = RPCNonceOracle(signer_address, rpc) -CICRegistry.address = config.get('CIC_REGISTRY_ADDRESS') -registry = CICRegistry(chain_spec, rpc) -account_registry_address = registry.by_name('AccountRegistry') +registry = Registry() +o = registry.address_of(config.get('CIC_REGISTRY_ADDRESS'), 'AccountRegistry') +r = rpc.do(o) +account_registry_address = registry.parse_address_of(r) +logg.info('using account registry {}'.format(account_registry_address)) keyfile_dir = os.path.join(config.get('_USERDIR'), 'keystore') os.makedirs(keyfile_dir) diff --git a/apps/contract-migration/scripts/verify.py b/apps/contract-migration/scripts/verify.py index 0c7ddefa..08b791a7 100644 --- a/apps/contract-migration/scripts/verify.py +++ b/apps/contract-migration/scripts/verify.py @@ -10,6 +10,7 @@ import hashlib import csv import json import urllib +import copy # external imports import celery @@ -39,7 +40,6 @@ from chainlib.eth.gas import ( from chainlib.eth.tx import TxFactory from chainlib.eth.rpc import jsonrpc_template from chainlib.eth.error import EthException -from cic_eth.api.api_admin import AdminApi from cic_types.models.person import ( Person, generate_metadata_pointer, @@ -51,12 +51,27 @@ logg = logging.getLogger() config_dir = '/usr/local/etc/cic-syncer' +custodial_tests = [ + 'local_key', + 'gas', + 'faucet', + ] + +all_tests = custodial_tests + [ + 'accounts_index', + 'balance', + 'metadata', + ] + argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url') +argparser.add_argument('--skip-custodial', dest='skip_custodial', action='store_true', help='skip all custodial verifications') +argparser.add_argument('--exclude', action='append', type=str, default=[], help='skip specified verification') +argparser.add_argument('--include', action='append', type=str, help='include specified verification') argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error') @@ -95,14 +110,61 @@ user_dir = args.user_dir # user_out_dir from import_users.py meta_url = args.meta_provider exit_on_error = args.x +active_tests = [] +exclude = [] +include = args.include +if args.include == None: + include = all_tests +for t in args.exclude: + if t not in all_tests: + raise ValueError('Cannot exclude unknown verification "{}"'.format(t)) + exclude.append(t) +if args.skip_custodial: + logg.info('will skip all custodial verifications ({})'.format(','.join(custodial_tests))) + for t in custodial_tests: + if t not in exclude: + exclude.append(t) +for t in include: + if t not in all_tests: + raise ValueError('Cannot include unknown verification "{}"'.format(t)) + if t not in exclude: + active_tests.append(t) + logg.info('will perform verification "{}"'.format(t)) + +api = None +for t in custodial_tests: + if t in active_tests: + from cic_eth.api.api_admin import AdminApi + api = AdminApi(None) + logg.info('activating custodial module'.format(t)) + break + +cols = os.get_terminal_size().columns + + +def to_terminalwidth(s): + ss = s.ljust(int(cols)-1) + ss += "\r" + return ss + +def default_outfunc(s): + ss = to_terminalwidth(s) + sys.stdout.write(ss) +outfunc = default_outfunc +if logg.isEnabledFor(logging.DEBUG): + outfunc = logg.debug + class VerifierState: - def __init__(self, item_keys): + def __init__(self, item_keys, active_tests=None): self.items = {} for k in item_keys: - logg.info('k {}'.format(k)) self.items[k] = 0 + if active_tests == None: + self.active_tests = copy.copy(item_keys) + else: + self.active_tests = copy.copy(active_tests) def poke(self, item_key): @@ -112,7 +174,10 @@ class VerifierState: def __str__(self): r = '' for k in self.items.keys(): - r += '{}: {}\n'.format(k, self.items[k]) + if k in self.active_tests: + r += '{}: {}\n'.format(k, self.items[k]) + else: + r += '{}: skipped\n'.format(k) return r @@ -148,10 +213,10 @@ class Verifier: verifymethods = [] for k in dir(self): if len(k) > 7 and k[:7] == 'verify_': - logg.info('adding verify method {}'.format(k)) + logg.debug('verifier has verify method {}'.format(k)) verifymethods.append(k[7:]) - self.state = VerifierState(verifymethods) + self.state = VerifierState(verifymethods, active_tests=active_tests) def verify_accounts_index(self, address, balance=None): @@ -233,26 +298,14 @@ class Verifier: raise VerifierError(o_retrieved, 'metadata (person)') - def verify(self, address, balance): - logg.debug('verify {} {}'.format(address, balance)) + def verify(self, address, balance, debug_stem=None): - methods = [ - 'local_key', - 'accounts_index', - 'balance', - 'metadata', - 'gas', - 'faucet', - ] - - for k in methods: + for k in active_tests: + s = '{} {}'.format(debug_stem, k) + outfunc(s) try: m = getattr(self, 'verify_{}'.format(k)) m(address, balance) -# self.verify_local_key(address) -# self.verify_accounts_index(address) -# self.verify_balance(address, balance) -# self.verify_metadata(address) except VerifierError as e: logline = 'verification {} failed for {}: {}'.format(k, address, str(e)) if self.exit_on_error: @@ -266,10 +319,6 @@ class Verifier: return str(self.state) -class MockClient: - - w3 = None - def main(): global chain_str, block_offset, user_dir @@ -291,7 +340,6 @@ def main(): o['params'].append(txf.normalize(tx)) o['params'].append('latest') r = conn.do(o) - print('r {}'.format(r)) token_index_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) logg.info('found token index address {}'.format(token_index_address)) @@ -320,6 +368,7 @@ def main(): logg.info('found faucet {}'.format(faucet_address)) + # Get Sarafu token address tx = txf.template(ZERO_ADDRESS, token_index_address) data = add_0x(registry_addressof_method) @@ -333,7 +382,6 @@ def main(): o['params'].append(txf.normalize(tx)) o['params'].append('latest') r = conn.do(o) - print('r {}'.format(r)) sarafu_token_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) logg.info('found token address {}'.format(sarafu_token_address)) @@ -348,7 +396,7 @@ def main(): try: address = to_checksum_address(r[0]) #sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r") - logg.debug('loading balance {} {}'.format(i, address).ljust(200)) + outfunc('loading balance {} {}'.format(i, address)) #.ljust(200)) except ValueError: break balance = int(r[1].rstrip()) @@ -357,11 +405,10 @@ def main(): f.close() - api = AdminApi(MockClient()) - verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, faucet_address, user_dir, exit_on_error) user_new_dir = os.path.join(user_dir, 'new') + i = 0 for x in os.walk(user_new_dir): for y in x[2]: if y[len(y)-5:] != '.json': @@ -377,7 +424,7 @@ def main(): f.close() u = Person.deserialize(o) - logg.debug('data {}'.format(u.identities['evm'])) + #logg.debug('data {}'.format(u.identities['evm'])) subchain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id()) new_address = u.identities['evm'][subchain_str][0] @@ -388,9 +435,11 @@ def main(): balance = balances[old_address] except KeyError: logg.info('no old balance found for {}, assuming 0'.format(old_address)) - logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance)) - verifier.verify(new_address, balance) + s = 'checking {}: {} -> {} = {}'.format(i, old_address, new_address, balance) + + verifier.verify(new_address, balance, debug_stem=s) + i += 1 print(verifier) diff --git a/docker-compose.yml b/docker-compose.yml index 6fa6e8a1..5adf741f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -373,7 +373,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_retry.sh -v + ./start_retry.sh -vv # command: "/root/start_retry.sh -q cic-eth -vv" From 810f9fe99409791a23152fa4d85649fd043db61e Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Fri, 2 Apr 2021 13:16:27 +0000 Subject: [PATCH 2/2] Rehabilitate retrier --- apps/cic-eth/cic_eth/api/api_admin.py | 19 ++- apps/cic-eth/cic_eth/eth/tx.py | 59 +++++--- apps/cic-eth/cic_eth/queue/tx.py | 13 +- .../cic-eth/cic_eth/runnable/daemons/retry.py | 140 ++++++++++++------ apps/cic-eth/cic_eth/runnable/resend.py | 40 +---- apps/cic-eth/tests/task/test_task_tx.py | 44 ++++++ .../scripts/import_users.py | 2 +- docker-compose.yml | 8 +- 8 files changed, 212 insertions(+), 113 deletions(-) diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index bcc6174e..458b03fb 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -134,7 +134,8 @@ class AdminApi: return s_have.apply_async() - def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False): + def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False): + logg.debug('resend {}'.format(tx_hash_hex)) s_get_tx_cache = celery.signature( 'cic_eth.queue.tx.get_tx_cache', @@ -156,7 +157,7 @@ class AdminApi: s = celery.signature( 'cic_eth.eth.tx.resend_with_higher_gas', [ - chain_str, + chain_spec.asdict(), None, 1.01, ], @@ -176,7 +177,7 @@ class AdminApi: s_gas = celery.signature( 'cic_eth.admin.ctrl.unlock_send', [ - chain_str, + chain_spec.asdict(), tx_dict['sender'], ], queue=self.queue, @@ -218,7 +219,7 @@ class AdminApi: blocking_tx = k blocking_nonce = nonce_otx elif nonce_otx - last_nonce > 1: - logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce)) + logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from'])) blocking_tx = k blocking_nonce = nonce_otx break @@ -312,10 +313,10 @@ class AdminApi: tx_dict = s.apply_async().get() if tx_dict['sender'] == address: if tx_dict['nonce'] - last_nonce > 1: - logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash'])) + logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash)) errors.append('nonce') elif tx_dict['nonce'] == last_nonce: - logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash'])) + logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash)) last_nonce = tx_dict['nonce'] if not include_sender: logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash'])) @@ -480,15 +481,17 @@ class AdminApi: tx['destination_token_symbol'] = destination_token.symbol() tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call() - tx['network_status'] = 'Not submitted' + # TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which + tx['network_status'] = 'Not in node' r = None try: o = transaction(tx_hash) r = self.rpc.do(o) + if r != None: + tx['network_status'] = 'Mempool' except Exception as e: logg.warning('(too permissive exception handler, please fix!) {}'.format(e)) - tx['network_status'] = 'Mempool' if r != None: try: diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index aef7d537..a930b847 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -7,7 +7,10 @@ import requests from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec from chainlib.eth.address import is_checksum_address -from chainlib.eth.gas import balance +from chainlib.eth.gas import ( + balance, + price, + ) from chainlib.eth.error import ( EthException, NotFoundEthException, @@ -17,11 +20,15 @@ from chainlib.eth.tx import ( receipt, raw, TxFormat, + TxFactory, unpack, ) from chainlib.connection import RPCConnection from chainlib.hash import keccak256_hex_to_hex -from chainlib.eth.gas import Gas +from chainlib.eth.gas import ( + Gas, + OverrideGasOracle, + ) from chainlib.eth.contract import ( abi_decode_single, ABIContractType, @@ -52,6 +59,7 @@ from cic_eth.queue.tx import ( get_tx, register_tx, get_nonce_tx, + create as queue_create, ) from cic_eth.error import OutOfGasError from cic_eth.error import LockedError @@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict): @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) -def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): +def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1): """Create a new transaction from an existing one with same nonce and higher gas price. :param txold_hash_hex: Transaction to re-create @@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa session.close() raise NotLocalTxError(txold_hash_hex) - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + chain_spec = ChainSpec.from_dict(chain_spec_dict) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx)) - queue = self.request.delivery_info['routing_key'] + queue = self.request.delivery_info.get('routing_key') logg.debug('before {}'.format(tx)) - if gas != None: - tx['gasPrice'] = gas - else: - gas_price = c.gas_price() - if tx['gasPrice'] > gas_price: - logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) + + rpc = RPCConnection.connect(chain_spec, 'default') + new_gas_price = gas + if new_gas_price == None: + o = price() + r = rpc.do(o) + current_gas_price = int(r, 16) + if tx['gasPrice'] > current_gas_price: + logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice'])) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor) - tx['gasPrice'] += 1 + new_gas_price = tx['gasPrice'] + 1 else: new_gas_price = int(tx['gasPrice'] * default_factor) - if gas_price > new_gas_price: - tx['gasPrice'] = gas_price - else: - tx['gasPrice'] = new_gas_price + #if gas_price > new_gas_price: + # tx['gasPrice'] = gas_price + #else: + # tx['gasPrice'] = new_gas_price - (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) + + rpc_signer = RPCConnection.connect(chain_spec, 'signer') + gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc) + + c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) + logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) + tx['gasPrice'] = new_gas_price + (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) queue_create( tx['nonce'], tx['from'], tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, session=session, ) TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) session.close() - s = create_check_gas_and_send_task( + s = create_check_gas_task( [tx_signed_raw_hex], - chain_str, + chain_spec, tx['from'], tx['gasPrice'] * tx['gas'], [tx_hash_hex], diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 6b228cc0..66b4e3a2 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se q = q.filter(Otx.status.op('&')(status)>0) if not_status != None: q = q.filter(Otx.status.op('&')(not_status)==0) + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) i = 0 for o in q.all(): if limit > 0 and i == limit: @@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se # TODO: move query to model -def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None): +def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None): """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. Will omit addresses that have the LockEnum.SEND bit in Lock set. @@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch if status == StatusEnum.PENDING: q_outer = q_outer.filter(Otx.status==status.value) else: - q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value) + q_outer = q_outer.filter(Otx.status.op('&')(status)==status) + + if not_status != None: + q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) if recipient != None: q_outer = q_outer.filter(TxCache.recipient==recipient) @@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch txs = {} + i = 0 for r in q_outer.all(): q = session.query(Otx) q = q.join(TxCache) @@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch session.add(o) session.commit() + i += 1 + if limit > 0 and limit == i: + break + SessionBase.release_session(session) return txs diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 6fb94784..b7889a32 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -1,3 +1,4 @@ +# standard imports import os import sys import logging @@ -5,22 +6,36 @@ import argparse import re import datetime -import web3 +# external imports import confini import celery from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec +from chainlib.eth.tx import unpack +from chainlib.connection import RPCConnection +from chainlib.eth.block import ( + block_latest, + block_by_number, + Block, + ) +from chainsyncer.driver import HeadSyncer +from chainsyncer.backend import MemBackend +from chainsyncer.error import NoBlockForYou +# local imports from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase -from cic_eth.eth import RpcClient -from cic_eth.sync.retry import RetrySyncer -from cic_eth.queue.tx import get_status_tx -from cic_eth.queue.tx import get_tx +from cic_eth.queue.tx import ( + get_status_tx, + get_tx, +# get_upcoming_tx, + ) from cic_eth.admin.ctrl import lock_send -from cic_eth.db.enum import StatusEnum -from cic_eth.db.enum import LockEnum -from cic_eth.eth.util import unpack_signed_raw_tx_hex +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + LockEnum, + ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -31,7 +46,8 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') -argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') +argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') +argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') @@ -51,7 +67,6 @@ config.process() # override args args_override = { 'ETH_PROVIDER': getattr(args, 'p'), - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), } @@ -59,6 +74,7 @@ config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +config.add(args.batch_size, '_BATCH_SIZE', True) app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) @@ -66,10 +82,10 @@ queue = args.q chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -RPCConnection.registry_location(args.p, chain_spec, tag='default') +RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') dsn = dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) @@ -178,53 +194,85 @@ def dispatch(conn, chain_spec): # s_send.apply_async() -class RetrySyncer(Syncer): +class StragglerFilter: - def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): + def __init__(self, chain_spec, queue='cic-eth'): + self.chain_spec = chain_spec + self.queue = queue + + + def filter(self, conn, block, tx, db_session=None): + logg.debug('tx {}'.format(tx)) + s_send = celery.signature( + 'cic_eth.eth.tx.resend_with_higher_gas', + [ + tx, + self.chain_spec.asdict(), + ], + queue=self.queue, + ) + return s_send.apply_async() + #return s_send + + + def __str__(self): + return 'stragglerfilter' + + +class RetrySyncer(HeadSyncer): + + def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): + backend = MemBackend(chain_spec, None) + super(RetrySyncer, self).__init__(backend) self.chain_spec = chain_spec if failed_grace_seconds == None: failed_grace_seconds = stalled_grace_seconds self.stalled_grace_seconds = stalled_grace_seconds self.failed_grace_seconds = failed_grace_seconds - self.final_func = final_func + self.batch_size = batch_size + self.conn = conn - def get(self): -# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) -# failed_txs = get_status_tx( -# StatusEnum.SENDFAIL.value, -# before=before, -# ) - before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) - stalled_txs = get_status_tx( - StatusBits.IN_NETWORK.value, - not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, - before=before, - ) - # return list(failed_txs.keys()) + list(stalled_txs.keys()) - return stalled_txs - - def process(self, conn, ref): - logg.debug('tx {}'.format(ref)) - for f in self.filter: - f(conn, ref, None, str(self.chain_spec)) + def get(self, conn): + o = block_latest() + r = conn.do(o) + (pair, flags) = self.backend.get() + n = int(r, 16) + if n == pair[0]: + raise NoBlockForYou('block {} already checked'.format(n)) + o = block_by_number(n) + r = conn.do(o) + b = Block(r) + return b + def process(self, conn, block): + before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) + stalled_txs = get_status_tx( + StatusBits.IN_NETWORK.value, + not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, + before=before, + limit=self.batch_size, + ) +# stalled_txs = get_upcoming_tx( +# status=StatusBits.IN_NETWORK.value, +# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, +# before=before, +# limit=self.batch_size, +# ) + for tx in stalled_txs: + self.filter.apply(self.conn, block, tx) + self.backend.set(block.number, 0) - def loop(self, interval): - while self.running and Syncer.running_global: - rpc = RPCConnection.connect(self.chain_spec, 'default') - for tx in self.get(): - self.process(rpc, tx) - if self.final_func != None: - self.final_func(rpc, self.chain_spec) - time.sleep(interval) def main(): - - syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) - syncer.filter.append(sendfail_filter) - syncer.loop(float(straggler_delay)) + #o = block_latest() + conn = RPCConnection.connect(chain_spec, 'default') + #block = conn.do(o) + syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE')) + syncer.backend.set(0, 0) + syncer.add_filter(StragglerFilter(chain_spec, queue=queue)) + syncer.loop(float(straggler_delay), conn) if __name__ == '__main__': diff --git a/apps/cic-eth/cic_eth/runnable/resend.py b/apps/cic-eth/cic_eth/runnable/resend.py index 77faf95a..7198b8ac 100644 --- a/apps/cic-eth/cic_eth/runnable/resend.py +++ b/apps/cic-eth/cic_eth/runnable/resend.py @@ -7,13 +7,10 @@ import os # third-party imports import celery import confini -import web3 -from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec -from cic_registry.chain import ChainRegistry +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection # local imports -from cic_eth.eth.rpc import RpcClient from cic_eth.api.api_admin import AdminApi logging.basicConfig(level=logging.WARNING) @@ -55,41 +52,20 @@ args_override = { config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +config.add(args.tx_hash, '_TX_HASH', True) +config.add(args.unlock, '_UNLOCK', True) chain_spec = ChainSpec.from_chain_str(args.i) -chain_str = str(chain_spec) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) +rpc = EthHTTPConnection(config.get('ETH_PROVIDER')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -c = RpcClient(chain_spec) - -CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) -chain_registry = ChainRegistry(chain_spec) -CICRegistry.add_chain_registry(chain_registry) -CICRegistry.add_path(config.get('ETH_ABI_DIR')) -CICRegistry.load_for(chain_spec) - - def main(): - api = AdminApi(c) + api = AdminApi(rpc) tx_details = api.tx(chain_spec, args.tx_hash) - t = api.resend(args.tx_hash, chain_str, unlock=True) - + t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK')) + print(t.get_leaf()) if __name__ == '__main__': main() diff --git a/apps/cic-eth/tests/task/test_task_tx.py b/apps/cic-eth/tests/task/test_task_tx.py index 2f223c54..99d1a3d0 100644 --- a/apps/cic-eth/tests/task/test_task_tx.py +++ b/apps/cic-eth/tests/task/test_task_tx.py @@ -12,10 +12,12 @@ from chainlib.eth.tx import ( transaction, receipt, ) +from hexathon import strip_0x # local imports from cic_eth.queue.tx import register_tx from cic_eth.eth.tx import cache_gas_data +from cic_eth.db.models.otx import Otx logg = logging.getLogger() @@ -60,6 +62,7 @@ def test_tx_send( assert rcpt['status'] == 1 +@pytest.mark.skip() def test_sync_tx( default_chain_spec, eth_rpc, @@ -67,3 +70,44 @@ def test_sync_tx( celery_worker, ): pass + + +def test_resend_with_higher_gas( + init_database, + default_chain_spec, + eth_rpc, + eth_signer, + agent_roles, + celery_worker, + ): + + chain_id = default_chain_spec.chain_id() + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) + (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) + #unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id()) + + s = celery.signature( + 'cic_eth.eth.tx.resend_with_higher_gas', + [ + tx_hash_hex, + default_chain_spec.asdict(), + ], + queue=None, + ) + t = s.apply_async() + r = t.get_leaf() + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==r) + otx = q.first() + if otx == None: + raise NotLocalTxError(r) + + tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id()) + logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice'])) + assert tx_after['gasPrice'] > tx_before['gasPrice'] + diff --git a/apps/contract-migration/scripts/import_users.py b/apps/contract-migration/scripts/import_users.py index 8264a319..79f370a7 100644 --- a/apps/contract-migration/scripts/import_users.py +++ b/apps/contract-migration/scripts/import_users.py @@ -36,7 +36,7 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') -argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node') +argparser.add_argument('--batch-size', dest='batch_size', default=80, type=int, help='burst size of sending transactions to node') argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') diff --git a/docker-compose.yml b/docker-compose.yml index 5adf741f..b0a29776 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -317,8 +317,8 @@ services: CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS - #DATABASE_DEBUG: ${DATABASE_DEBUG:-false} - DATABASE_DEBUG: 1 + DATABASE_DEBUG: ${DATABASE_DEBUG:-false} + #DATABASE_DEBUG: 1 depends_on: - eth @@ -359,6 +359,8 @@ services: CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS CIC_TX_RETRY_DELAY: 15 + BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50} + #DATABASE_DEBUG: 1 depends_on: - eth - postgres @@ -373,7 +375,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_retry.sh -vv + ./start_retry.sh -vv # command: "/root/start_retry.sh -q cic-eth -vv"