Implement retrier on chainsyncer

This commit is contained in:
nolash 2021-03-30 00:39:26 +02:00
parent 80d0bfe234
commit 2e5ceac4d0
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 58 additions and 45 deletions

View File

@ -1,3 +1,4 @@
# standard imports
import os import os
import sys import sys
import logging import logging
@ -5,22 +6,28 @@ import argparse
import re import re
import datetime import datetime
import web3 # external imports
import confini import confini
import celery import celery
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainlib.eth.block import block_latest
from chainsyncer.driver import HeadSyncer
from chainsyncer.backend import MemBackend
# local imports
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.sync.retry import RetrySyncer
from cic_eth.queue.tx import get_status_tx from cic_eth.queue.tx import get_status_tx
from cic_eth.queue.tx import get_tx from cic_eth.queue.tx import get_tx
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
from cic_eth.db.enum import LockEnum StatusEnum,
from cic_eth.eth.util import unpack_signed_raw_tx_hex StatusBits,
LockEnum,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -51,7 +58,6 @@ config.process()
# override args # override args
args_override = { args_override = {
'ETH_PROVIDER': getattr(args, 'p'), 'ETH_PROVIDER': getattr(args, 'p'),
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
} }
@ -66,7 +72,7 @@ queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.registry_location(args.p, chain_spec, tag='default') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn)
@ -178,53 +184,60 @@ def dispatch(conn, chain_spec):
# s_send.apply_async() # s_send.apply_async()
class RetrySyncer(Syncer): class FooFilter:
def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): def __init__(self, chain_spec, queue='cic-eth'):
self.chain_spec = chain_spec
self.queue = queue
def filter(self, conn, block, tx, db_session=None):
s_send = celery.signature(
'cic_eth.eth.resend_with_higher_gas',
[
[tx],
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_send.apply_async()
class RetrySyncer(HeadSyncer):
def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None):
backend = MemBackend(chain_spec, None)
super(RetrySyncer, self).__init__(backend)
self.chain_spec = chain_spec self.chain_spec = chain_spec
if failed_grace_seconds == None: if failed_grace_seconds == None:
failed_grace_seconds = stalled_grace_seconds failed_grace_seconds = stalled_grace_seconds
self.stalled_grace_seconds = stalled_grace_seconds self.stalled_grace_seconds = stalled_grace_seconds
self.failed_grace_seconds = failed_grace_seconds self.failed_grace_seconds = failed_grace_seconds
self.final_func = final_func self.batch_size = batch_size
self.conn = conn
def get(self): def process(self, conn, block):
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
# failed_txs = get_status_tx( stalled_txs = get_status_tx(
# StatusEnum.SENDFAIL.value, StatusBits.IN_NETWORK.value,
# before=before, not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
# ) before=before,
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) limit=self.batch_size,
stalled_txs = get_status_tx( )
StatusBits.IN_NETWORK.value, for tx in stalled_txs:
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, self.filter.apply(self.conn, block, tx)
before=before, self.backend.set(block.number + 1, 0)
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())
return stalled_txs
def process(self, conn, ref):
logg.debug('tx {}'.format(ref))
for f in self.filter:
f(conn, ref, None, str(self.chain_spec))
def loop(self, interval):
while self.running and Syncer.running_global:
rpc = RPCConnection.connect(self.chain_spec, 'default')
for tx in self.get():
self.process(rpc, tx)
if self.final_func != None:
self.final_func(rpc, self.chain_spec)
time.sleep(interval)
def main(): def main():
o = block_latest()
syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) conn = RPCConnection.connect(chain_spec, 'default')
syncer.filter.append(sendfail_filter) block = conn.do(o)
syncer.loop(float(straggler_delay)) syncer = RetrySyncer(conn, chain_spec, straggler_delay)
syncer.backend.set(int(block, 16), 0)
syncer.add_filter(FooFilter(chain_spec, queue=queue))
syncer.loop(float(straggler_delay), conn)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -373,7 +373,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_retry.sh -v ./start_retry.sh -vv
# command: "/root/start_retry.sh -q cic-eth -vv" # command: "/root/start_retry.sh -q cic-eth -vv"