Merge branch 'lash/retry-on-signer-error' into 'master'

Retry on signer error

See merge request grassrootseconomics/cic-internal-integration!55
This commit is contained in:
Louis Holbrook 2021-03-07 13:51:59 +00:00
commit 543c6249b9
9 changed files with 58 additions and 42 deletions

View File

@ -66,3 +66,10 @@ class LockedError(Exception):
""" """
pass pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass

View File

@ -21,8 +21,14 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError from cic_eth.error import (
from cic_eth.task import CriticalSQLAlchemyTask RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
)
#logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
logg = logging.getLogger() logg = logging.getLogger()
@ -139,7 +145,7 @@ def unpack_gift(data):
# TODO: Separate out nonce initialization task # TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
def create(password, chain_str): def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore. """Creates and stores a new ethereum account in the keystore.
@ -154,7 +160,13 @@ def create(password, chain_str):
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
a = None
try:
a = c.w3.eth.personal.new_account(password) a = c.w3.eth.personal.new_account(password)
except FileNotFoundError:
pass
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a)) logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account # Initialize nonce provider record for account
@ -165,7 +177,7 @@ def create(password, chain_str):
return a return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_str, writer_address=None): def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index. """Creates a transaction to add the given address to the accounts index.
@ -215,7 +227,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_str): def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address. """Creates a transaction to invoke the faucet contract for the given address.

View File

@ -8,6 +8,7 @@ from cic_registry.chain import ChainSpec
# local imports # local imports
from cic_eth.eth import RpcClient from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create from cic_eth.queue.tx import create as queue_create
from cic_eth.error import SignerError
celery_app = celery.current_app celery_app = celery.current_app
logg = celery_app.log.get_default_logger() logg = celery_app.log.get_default_logger()
@ -26,7 +27,13 @@ def sign_tx(tx, chain_str):
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
tx_transfer_signed = None
try:
tx_transfer_signed = c.w3.eth.sign_transaction(tx) tx_transfer_signed = c.w3.eth.sign_transaction(tx)
except FileNotFoundError:
pass
if tx_transfer_signed == None:
raise SignerError('sign tx')
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed)) logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
tx_hash_hex = tx_hash.hex() tx_hash_hex = tx_hash.hex()

View File

@ -24,6 +24,7 @@ from cic_eth.ext.address import translate_address
from cic_eth.task import ( from cic_eth.task import (
CriticalSQLAlchemyTask, CriticalSQLAlchemyTask,
CriticalWeb3Task, CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
) )
celery_app = celery.current_app celery_app = celery.current_app
@ -212,7 +213,7 @@ def balance(tokens, holder_address, chain_str):
return tokens return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str): def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses """Transfer ERC20 tokens between addresses
@ -268,7 +269,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
return tx_hash_hex return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str): def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address """Approve ERC20 transfer on behalf of holder address

View File

@ -36,6 +36,8 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.task import ( from cic_eth.task import (
CriticalSQLAlchemyTask, CriticalSQLAlchemyTask,
CriticalWeb3Task, CriticalWeb3Task,
CriticalWeb3AndSignerTask,
CriticalSQLAlchemyAndSignerTask,
) )
celery_app = celery.current_app celery_app = celery.current_app
@ -418,7 +420,7 @@ def send(self, txs, chain_str):
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity # TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) @celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_str): def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures. """Executes a native token transaction to fund the recipient's gas expenditures.
@ -511,7 +513,7 @@ def refill_gas(self, recipient_address, chain_str):
return tx_send_gas_signed['raw'] return tx_send_gas_signed['raw']
@celery_app.task(bind=True) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price. """Create a new transaction from an existing one with same nonce and higher gas price.

View File

@ -5,6 +5,9 @@ import requests
import celery import celery
import sqlalchemy import sqlalchemy
# local imports
from cic_eth.error import SignerError
class CriticalTask(celery.Task): class CriticalTask(celery.Task):
retry_jitter = True retry_jitter = True
@ -31,3 +34,16 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
) )
class CriticalSQLAlchemyAndSignerTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
SignerError,
)
class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
SignerError,
)

View File

@ -6,4 +6,4 @@ HOST=localhost
PORT=5432 PORT=5432
ENGINE=postgresql ENGINE=postgresql
DRIVER=psycopg2 DRIVER=psycopg2
DEBUG= DEBUG=0

View File

@ -1,3 +1,3 @@
[tasks] [tasks]
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
trace_queue_status = trace_queue_status = 1

View File

@ -1,29 +0,0 @@
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
def test_debug_alert(
init_database,
celery_session_worker,
):
s = celery.signature(
'cic_eth.admin.debug.alert',
[
'foo',
'bar',
'baz',
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Debug)
q = q.filter(Debug.tag=='bar')
o = q.first()
assert o.description == 'baz'