Compare commits
2 Commits
master
...
lash/retry
Author | SHA1 | Date | |
---|---|---|---|
|
ec45ce6db4 | ||
|
9de522f9d9 |
@ -66,3 +66,10 @@ class LockedError(Exception):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SignerError(Exception):
|
||||||
|
"""Exception raised when signer is unavailable or generates an error
|
||||||
|
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
@ -21,8 +21,14 @@ from cic_eth.db.models.base import SessionBase
|
|||||||
from cic_eth.db.models.role import AccountRole
|
from cic_eth.db.models.role import AccountRole
|
||||||
from cic_eth.db.models.tx import TxCache
|
from cic_eth.db.models.tx import TxCache
|
||||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||||
from cic_eth.error import RoleMissingError
|
from cic_eth.error import (
|
||||||
from cic_eth.task import CriticalSQLAlchemyTask
|
RoleMissingError,
|
||||||
|
SignerError,
|
||||||
|
)
|
||||||
|
from cic_eth.task import (
|
||||||
|
CriticalSQLAlchemyTask,
|
||||||
|
CriticalSQLAlchemyAndSignerTask,
|
||||||
|
)
|
||||||
|
|
||||||
#logg = logging.getLogger(__name__)
|
#logg = logging.getLogger(__name__)
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
@ -139,7 +145,7 @@ def unpack_gift(data):
|
|||||||
|
|
||||||
|
|
||||||
# TODO: Separate out nonce initialization task
|
# TODO: Separate out nonce initialization task
|
||||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
@celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def create(password, chain_str):
|
def create(password, chain_str):
|
||||||
"""Creates and stores a new ethereum account in the keystore.
|
"""Creates and stores a new ethereum account in the keystore.
|
||||||
|
|
||||||
@ -154,7 +160,13 @@ def create(password, chain_str):
|
|||||||
"""
|
"""
|
||||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
c = RpcClient(chain_spec)
|
c = RpcClient(chain_spec)
|
||||||
a = c.w3.eth.personal.new_account(password)
|
a = None
|
||||||
|
try:
|
||||||
|
a = c.w3.eth.personal.new_account(password)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
if a == None:
|
||||||
|
raise SignerError('create account')
|
||||||
logg.debug('created account {}'.format(a))
|
logg.debug('created account {}'.format(a))
|
||||||
|
|
||||||
# Initialize nonce provider record for account
|
# Initialize nonce provider record for account
|
||||||
@ -165,7 +177,7 @@ def create(password, chain_str):
|
|||||||
return a
|
return a
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
|
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def register(self, account_address, chain_str, writer_address=None):
|
def register(self, account_address, chain_str, writer_address=None):
|
||||||
"""Creates a transaction to add the given address to the accounts index.
|
"""Creates a transaction to add the given address to the accounts index.
|
||||||
|
|
||||||
@ -215,7 +227,7 @@ def register(self, account_address, chain_str, writer_address=None):
|
|||||||
return account_address
|
return account_address
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def gift(self, account_address, chain_str):
|
def gift(self, account_address, chain_str):
|
||||||
"""Creates a transaction to invoke the faucet contract for the given address.
|
"""Creates a transaction to invoke the faucet contract for the given address.
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@ from cic_registry.chain import ChainSpec
|
|||||||
# local imports
|
# local imports
|
||||||
from cic_eth.eth import RpcClient
|
from cic_eth.eth import RpcClient
|
||||||
from cic_eth.queue.tx import create as queue_create
|
from cic_eth.queue.tx import create as queue_create
|
||||||
|
from cic_eth.error import SignerError
|
||||||
|
|
||||||
celery_app = celery.current_app
|
celery_app = celery.current_app
|
||||||
logg = celery_app.log.get_default_logger()
|
logg = celery_app.log.get_default_logger()
|
||||||
@ -26,7 +27,13 @@ def sign_tx(tx, chain_str):
|
|||||||
"""
|
"""
|
||||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
c = RpcClient(chain_spec)
|
c = RpcClient(chain_spec)
|
||||||
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
|
tx_transfer_signed = None
|
||||||
|
try:
|
||||||
|
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
if tx_transfer_signed == None:
|
||||||
|
raise SignerError('sign tx')
|
||||||
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
|
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
|
||||||
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
|
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
|
||||||
tx_hash_hex = tx_hash.hex()
|
tx_hash_hex = tx_hash.hex()
|
||||||
|
@ -24,6 +24,7 @@ from cic_eth.ext.address import translate_address
|
|||||||
from cic_eth.task import (
|
from cic_eth.task import (
|
||||||
CriticalSQLAlchemyTask,
|
CriticalSQLAlchemyTask,
|
||||||
CriticalWeb3Task,
|
CriticalWeb3Task,
|
||||||
|
CriticalSQLAlchemyAndSignerTask,
|
||||||
)
|
)
|
||||||
|
|
||||||
celery_app = celery.current_app
|
celery_app = celery.current_app
|
||||||
@ -212,7 +213,7 @@ def balance(tokens, holder_address, chain_str):
|
|||||||
return tokens
|
return tokens
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||||
"""Transfer ERC20 tokens between addresses
|
"""Transfer ERC20 tokens between addresses
|
||||||
|
|
||||||
@ -268,7 +269,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
|||||||
return tx_hash_hex
|
return tx_hash_hex
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||||
"""Approve ERC20 transfer on behalf of holder address
|
"""Approve ERC20 transfer on behalf of holder address
|
||||||
|
|
||||||
|
@ -36,6 +36,8 @@ from cic_eth.admin.ctrl import lock_send
|
|||||||
from cic_eth.task import (
|
from cic_eth.task import (
|
||||||
CriticalSQLAlchemyTask,
|
CriticalSQLAlchemyTask,
|
||||||
CriticalWeb3Task,
|
CriticalWeb3Task,
|
||||||
|
CriticalWeb3AndSignerTask,
|
||||||
|
CriticalSQLAlchemyAndSignerTask,
|
||||||
)
|
)
|
||||||
|
|
||||||
celery_app = celery.current_app
|
celery_app = celery.current_app
|
||||||
@ -418,7 +420,7 @@ def send(self, txs, chain_str):
|
|||||||
|
|
||||||
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
|
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
|
||||||
# TODO: method is too long, factor out code for clarity
|
# TODO: method is too long, factor out code for clarity
|
||||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3AndSignerTask)
|
||||||
def refill_gas(self, recipient_address, chain_str):
|
def refill_gas(self, recipient_address, chain_str):
|
||||||
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
||||||
|
|
||||||
@ -511,7 +513,7 @@ def refill_gas(self, recipient_address, chain_str):
|
|||||||
return tx_send_gas_signed['raw']
|
return tx_send_gas_signed['raw']
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
|
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
|
||||||
"""Create a new transaction from an existing one with same nonce and higher gas price.
|
"""Create a new transaction from an existing one with same nonce and higher gas price.
|
||||||
|
|
||||||
|
@ -5,6 +5,9 @@ import requests
|
|||||||
import celery
|
import celery
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_eth.error import SignerError
|
||||||
|
|
||||||
|
|
||||||
class CriticalTask(celery.Task):
|
class CriticalTask(celery.Task):
|
||||||
retry_jitter = True
|
retry_jitter = True
|
||||||
@ -31,3 +34,16 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
|
|||||||
sqlalchemy.exc.TimeoutError,
|
sqlalchemy.exc.TimeoutError,
|
||||||
requests.exceptions.ConnectionError,
|
requests.exceptions.ConnectionError,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
class CriticalSQLAlchemyAndSignerTask(CriticalTask):
|
||||||
|
autoretry_for = (
|
||||||
|
sqlalchemy.exc.DatabaseError,
|
||||||
|
sqlalchemy.exc.TimeoutError,
|
||||||
|
SignerError,
|
||||||
|
)
|
||||||
|
|
||||||
|
class CriticalWeb3AndSignerTask(CriticalTask):
|
||||||
|
autoretry_for = (
|
||||||
|
requests.exceptions.ConnectionError,
|
||||||
|
SignerError,
|
||||||
|
)
|
||||||
|
@ -6,4 +6,4 @@ HOST=localhost
|
|||||||
PORT=5432
|
PORT=5432
|
||||||
ENGINE=postgresql
|
ENGINE=postgresql
|
||||||
DRIVER=psycopg2
|
DRIVER=psycopg2
|
||||||
DEBUG=
|
DEBUG=0
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
[tasks]
|
[tasks]
|
||||||
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
|
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
|
||||||
trace_queue_status =
|
trace_queue_status = 1
|
||||||
|
@ -1,29 +0,0 @@
|
|||||||
# external imports
|
|
||||||
import celery
|
|
||||||
|
|
||||||
# local imports
|
|
||||||
from cic_eth.db.models.debug import Debug
|
|
||||||
|
|
||||||
|
|
||||||
def test_debug_alert(
|
|
||||||
init_database,
|
|
||||||
celery_session_worker,
|
|
||||||
):
|
|
||||||
|
|
||||||
s = celery.signature(
|
|
||||||
'cic_eth.admin.debug.alert',
|
|
||||||
[
|
|
||||||
'foo',
|
|
||||||
'bar',
|
|
||||||
'baz',
|
|
||||||
],
|
|
||||||
queue=None,
|
|
||||||
)
|
|
||||||
t = s.apply_async()
|
|
||||||
r = t.get()
|
|
||||||
assert r == 'foo'
|
|
||||||
|
|
||||||
q = init_database.query(Debug)
|
|
||||||
q = q.filter(Debug.tag=='bar')
|
|
||||||
o = q.first()
|
|
||||||
assert o.description == 'baz'
|
|
Loading…
Reference in New Issue
Block a user