cic-internal-integration/apps/cic-eth/cic_eth/task.py

34 lines
663 B
Python
Raw Permalink Normal View History

# import
import requests
# external imports
import celery
import sqlalchemy
class CriticalTask(celery.Task):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)
class CriticalWeb3Task(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
)
class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError,
)