diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 6729ca45..37ac0c26 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -21,6 +21,7 @@ from cic_eth.db.models.role import AccountRole from cic_eth.db.models.tx import TxCache from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.error import RoleMissingError +from cic_eth.task import CriticalSQLAlchemyTask #logg = logging.getLogger(__name__) logg = logging.getLogger() @@ -130,7 +131,8 @@ def unpack_gift(data): } -@celery_app.task() +# TODO: Separate out nonce initialization task +@celery_app.task(base=CriticalSQLAlchemyTask) def create(password, chain_str): """Creates and stores a new ethereum account in the keystore. @@ -149,6 +151,7 @@ def create(password, chain_str): logg.debug('created account {}'.format(a)) # Initialize nonce provider record for account + # TODO: this can safely be set to zero, since we are randomly creating account n = c.w3.eth.getTransactionCount(a, 'pending') session = SessionBase.create_session() o = session.query(Nonce).filter(Nonce.address_hex==a).first() @@ -162,7 +165,7 @@ def create(password, chain_str): return a -@celery_app.task(bind=True, throws=(RoleMissingError,)) +@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask) def register(self, account_address, chain_str, writer_address=None): """Creates a transaction to add the given address to the accounts index. @@ -211,7 +214,7 @@ def register(self, account_address, chain_str, writer_address=None): return account_address -@celery_app.task(bind=True) +@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) def gift(self, account_address, chain_str): """Creates a transaction to invoke the faucet contract for the given address. @@ -326,7 +329,7 @@ def cache_gift_data( return (tx_hash_hex, cache_id) -@celery_app.task() +@celery_app.task(base=CriticalSQLAlchemyTask) def cache_account_data( tx_hash_hex, tx_signed_raw_hex, diff --git a/apps/cic-eth/cic_eth/eth/task.py b/apps/cic-eth/cic_eth/eth/task.py index 0aeb4371..0b52cab4 100644 --- a/apps/cic-eth/cic_eth/eth/task.py +++ b/apps/cic-eth/cic_eth/eth/task.py @@ -51,19 +51,6 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None): logg.debug('adding queue tx {}'.format(tx_hash_hex)) -# s = celery.signature( -# 'cic_eth.queue.tx.create', -# [ -# tx['nonce'], -# tx['from'], -# tx_hash_hex, -# tx_signed_raw_hex, -# chain_str, -# ], -# queue=queue, -# ) - - # TODO: consider returning this as a signature that consequtive tasks can be linked to queue_create( tx['nonce'], tx['from'], diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py new file mode 100644 index 00000000..233a2c90 --- /dev/null +++ b/apps/cic-eth/cic_eth/task.py @@ -0,0 +1,13 @@ +# external imports +import celery +import sqlalchemy + + +class CriticalSQLAlchemyTask(celery.Task): + autoretry_for = ( + sqlalchemy.exc.DatabaseError, + sqlalchemy.exc.TimeoutError, + ) + retry_jitter = True + retry_backoff = True + retry_backoff_max = 8