diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index aaf7b299..458b03fb 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -219,7 +219,7 @@ class AdminApi: blocking_tx = k blocking_nonce = nonce_otx elif nonce_otx - last_nonce > 1: - logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce)) + logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from'])) blocking_tx = k blocking_nonce = nonce_otx break @@ -313,10 +313,10 @@ class AdminApi: tx_dict = s.apply_async().get() if tx_dict['sender'] == address: if tx_dict['nonce'] - last_nonce > 1: - logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash'])) + logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash)) errors.append('nonce') elif tx_dict['nonce'] == last_nonce: - logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash'])) + logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash)) last_nonce = tx_dict['nonce'] if not include_sender: logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash'])) @@ -481,7 +481,8 @@ class AdminApi: tx['destination_token_symbol'] = destination_token.symbol() tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call() - tx['network_status'] = 'Not submitted' + # TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which + tx['network_status'] = 'Not in node' r = None try: diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 6b228cc0..66b4e3a2 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se q = q.filter(Otx.status.op('&')(status)>0) if not_status != None: q = q.filter(Otx.status.op('&')(not_status)==0) + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) i = 0 for o in q.all(): if limit > 0 and i == limit: @@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se # TODO: move query to model -def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None): +def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None): """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. Will omit addresses that have the LockEnum.SEND bit in Lock set. @@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch if status == StatusEnum.PENDING: q_outer = q_outer.filter(Otx.status==status.value) else: - q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value) + q_outer = q_outer.filter(Otx.status.op('&')(status)==status) + + if not_status != None: + q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) if recipient != None: q_outer = q_outer.filter(TxCache.recipient==recipient) @@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch txs = {} + i = 0 for r in q_outer.all(): q = session.query(Otx) q = q.join(TxCache) @@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch session.add(o) session.commit() + i += 1 + if limit > 0 and limit == i: + break + SessionBase.release_session(session) return txs diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 9f87769d..b7889a32 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -25,8 +25,11 @@ from chainsyncer.error import NoBlockForYou # local imports from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase -from cic_eth.queue.tx import get_status_tx -from cic_eth.queue.tx import get_tx +from cic_eth.queue.tx import ( + get_status_tx, + get_tx, +# get_upcoming_tx, + ) from cic_eth.admin.ctrl import lock_send from cic_eth.db.enum import ( StatusEnum, @@ -43,6 +46,7 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') +argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') @@ -70,6 +74,7 @@ config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +config.add(args.batch_size, '_BATCH_SIZE', True) app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) @@ -80,7 +85,7 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') dsn = dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) @@ -197,6 +202,7 @@ class StragglerFilter: def filter(self, conn, block, tx, db_session=None): + logg.debug('tx {}'.format(tx)) s_send = celery.signature( 'cic_eth.eth.tx.resend_with_higher_gas', [ @@ -206,6 +212,11 @@ class StragglerFilter: queue=self.queue, ) return s_send.apply_async() + #return s_send + + + def __str__(self): + return 'stragglerfilter' class RetrySyncer(HeadSyncer): @@ -243,6 +254,12 @@ class RetrySyncer(HeadSyncer): before=before, limit=self.batch_size, ) +# stalled_txs = get_upcoming_tx( +# status=StatusBits.IN_NETWORK.value, +# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, +# before=before, +# limit=self.batch_size, +# ) for tx in stalled_txs: self.filter.apply(self.conn, block, tx) self.backend.set(block.number, 0) @@ -252,7 +269,7 @@ def main(): #o = block_latest() conn = RPCConnection.connect(chain_spec, 'default') #block = conn.do(o) - syncer = RetrySyncer(conn, chain_spec, straggler_delay) + syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE')) syncer.backend.set(0, 0) syncer.add_filter(StragglerFilter(chain_spec, queue=queue)) syncer.loop(float(straggler_delay), conn) diff --git a/docker-compose.yml b/docker-compose.yml index 23c61a48..b0a29776 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -359,6 +359,8 @@ services: CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS CIC_TX_RETRY_DELAY: 15 + BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50} + #DATABASE_DEBUG: 1 depends_on: - eth - postgres @@ -373,7 +375,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_retry.sh -vv + ./start_retry.sh -vv # command: "/root/start_retry.sh -q cic-eth -vv"