Correct ordering of straggler txs in retrier

This commit is contained in:
nolash 2021-03-31 08:00:58 +02:00
parent 8116375b67
commit 77cb5a6a0f
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 40 additions and 11 deletions

View File

@ -219,7 +219,7 @@ class AdminApi:
blocking_tx = k blocking_tx = k
blocking_nonce = nonce_otx blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1: elif nonce_otx - last_nonce > 1:
logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce)) logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k blocking_tx = k
blocking_nonce = nonce_otx blocking_nonce = nonce_otx
break break
@ -313,10 +313,10 @@ class AdminApi:
tx_dict = s.apply_async().get() tx_dict = s.apply_async().get()
if tx_dict['sender'] == address: if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1: if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash'])) logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
errors.append('nonce') errors.append('nonce')
elif tx_dict['nonce'] == last_nonce: elif tx_dict['nonce'] == last_nonce:
logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash'])) logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
last_nonce = tx_dict['nonce'] last_nonce = tx_dict['nonce']
if not include_sender: if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash'])) logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
@ -481,7 +481,8 @@ class AdminApi:
tx['destination_token_symbol'] = destination_token.symbol() tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call() tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
tx['network_status'] = 'Not submitted' # TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
r = None r = None
try: try:

View File

@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
q = q.filter(Otx.status.op('&')(status)>0) q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None: if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0) q = q.filter(Otx.status.op('&')(not_status)==0)
q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc())
i = 0 i = 0
for o in q.all(): for o in q.all():
if limit > 0 and i == limit: if limit > 0 and i == limit:
@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
# TODO: move query to model # TODO: move query to model
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None): def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set. Will omit addresses that have the LockEnum.SEND bit in Lock set.
@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
if status == StatusEnum.PENDING: if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value) q_outer = q_outer.filter(Otx.status==status.value)
else: else:
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value) q_outer = q_outer.filter(Otx.status.op('&')(status)==status)
if not_status != None:
q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0)
if recipient != None: if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient) q_outer = q_outer.filter(TxCache.recipient==recipient)
@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
txs = {} txs = {}
i = 0
for r in q_outer.all(): for r in q_outer.all():
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
session.add(o) session.add(o)
session.commit() session.commit()
i += 1
if limit > 0 and limit == i:
break
SessionBase.release_session(session) SessionBase.release_session(session)
return txs return txs

View File

@ -25,8 +25,11 @@ from chainsyncer.error import NoBlockForYou
# local imports # local imports
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.queue.tx import get_status_tx from cic_eth.queue.tx import (
from cic_eth.queue.tx import get_tx get_status_tx,
get_tx,
# get_upcoming_tx,
)
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import ( from cic_eth.db.enum import (
StatusEnum, StatusEnum,
@ -43,6 +46,7 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration')
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
@ -70,6 +74,7 @@ config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.batch_size, '_BATCH_SIZE', True)
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@ -80,7 +85,7 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
@ -197,6 +202,7 @@ class StragglerFilter:
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
logg.debug('tx {}'.format(tx))
s_send = celery.signature( s_send = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.tx.resend_with_higher_gas',
[ [
@ -206,6 +212,11 @@ class StragglerFilter:
queue=self.queue, queue=self.queue,
) )
return s_send.apply_async() return s_send.apply_async()
#return s_send
def __str__(self):
return 'stragglerfilter'
class RetrySyncer(HeadSyncer): class RetrySyncer(HeadSyncer):
@ -243,6 +254,12 @@ class RetrySyncer(HeadSyncer):
before=before, before=before,
limit=self.batch_size, limit=self.batch_size,
) )
# stalled_txs = get_upcoming_tx(
# status=StatusBits.IN_NETWORK.value,
# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
# before=before,
# limit=self.batch_size,
# )
for tx in stalled_txs: for tx in stalled_txs:
self.filter.apply(self.conn, block, tx) self.filter.apply(self.conn, block, tx)
self.backend.set(block.number, 0) self.backend.set(block.number, 0)
@ -252,7 +269,7 @@ def main():
#o = block_latest() #o = block_latest()
conn = RPCConnection.connect(chain_spec, 'default') conn = RPCConnection.connect(chain_spec, 'default')
#block = conn.do(o) #block = conn.do(o)
syncer = RetrySyncer(conn, chain_spec, straggler_delay) syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0) syncer.backend.set(0, 0)
syncer.add_filter(StragglerFilter(chain_spec, queue=queue)) syncer.add_filter(StragglerFilter(chain_spec, queue=queue))
syncer.loop(float(straggler_delay), conn) syncer.loop(float(straggler_delay), conn)

View File

@ -359,6 +359,8 @@ services:
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
CIC_TX_RETRY_DELAY: 15 CIC_TX_RETRY_DELAY: 15
BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50}
#DATABASE_DEBUG: 1
depends_on: depends_on:
- eth - eth
- postgres - postgres
@ -373,7 +375,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_retry.sh -vv ./start_retry.sh -vv
# command: "/root/start_retry.sh -q cic-eth -vv" # command: "/root/start_retry.sh -q cic-eth -vv"