From d9a8c672de069333a9270cb139ff1693973ca4af Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Wed, 3 Mar 2021 07:37:26 +0000 Subject: [PATCH] Atomic nonce queue db sessions --- apps/cic-eth/cic_eth/eth/account.py | 11 +++++++---- apps/cic-eth/cic_eth/eth/token.py | 21 ++++++++++++++------- apps/cic-eth/cic_eth/eth/tx.py | 20 ++++++++++++++++---- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 62d2d547..cd6b3d47 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -204,8 +204,9 @@ def register(self, account_address, chain_str, writer_address=None): txf = AccountTxFactory(writer_address, c) tx_add = txf.add(account_address, chain_spec, session=session) + + (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session) session.close() - (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data') gas_budget = tx_add['gas'] * tx_add['gasPrice'] @@ -241,12 +242,14 @@ def gift(self, account_address, chain_str): c = RpcClient(chain_spec, holder_address=account_address) txf = AccountTxFactory(account_address, c) - tx_add = txf.gift(account_address, chain_spec) - (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data') + session = SessionBase.create_session() + tx_add = txf.gift(account_address, chain_spec, session=session) + (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session) + session.close() gas_budget = tx_add['gas'] * tx_add['gasPrice'] - logg.debug('register user tx {}'.format(tx_hash_hex)) + logg.debug('gift user tx {}'.format(tx_hash_hex)) s = create_check_gas_and_send_task( [tx_signed_raw_hex], chain_str, diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index 98dd1ec8..cb09baf2 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -46,6 +46,7 @@ class TokenTxFactory(TxFactory): spender_address, amount, chain_spec, + session=None, ): """Create an ERC20 "approve" transaction @@ -73,7 +74,7 @@ class TokenTxFactory(TxFactory): 'gas': source_token_gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(), + 'nonce': self.next_nonce(session=session), }) return tx_approve @@ -84,6 +85,7 @@ class TokenTxFactory(TxFactory): receiver_address, value, chain_spec, + session=None, ): """Create an ERC20 "transfer" transaction @@ -112,7 +114,7 @@ class TokenTxFactory(TxFactory): 'gas': source_token_gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(), + 'nonce': self.next_nonce(session=session), }) return tx_transfer @@ -244,9 +246,11 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str): c = RpcClient(chain_spec, holder_address=holder_address) txf = TokenTxFactory(holder_address, c) - - tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec) - (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer') + + session = SessionBase.create_session() + tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session) + (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session) + session.close() gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice'] @@ -299,8 +303,10 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str): txf = TokenTxFactory(holder_address, c) - tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec) - (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve') + session = SessionBase.create_session() + tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session) + (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session) + session.close() gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice'] @@ -459,6 +465,7 @@ def cache_approve_data( return (tx_hash_hex, cache_id) +# TODO: Move to dedicated metadata package class ExtendedTx: _default_decimals = 6 diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index f7285ff8..1c34acca 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -387,6 +387,7 @@ def send(self, txs, chain_str): # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. +# TODO: method is too long, factor out code for clarity @celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) def refill_gas(self, recipient_address, chain_str): """Executes a native token transaction to fund the recipient's gas expenditures. @@ -409,8 +410,8 @@ def refill_gas(self, recipient_address, chain_str): q = q.filter(TxCache.from_value!=0) q = q.filter(TxCache.recipient==recipient_address) c = q.count() - session.close() if c > 0: + session.close() raise AlreadyFillingGasError(recipient_address) queue = self.request.delivery_info['routing_key'] @@ -420,7 +421,7 @@ def refill_gas(self, recipient_address, chain_str): logg.debug('refill gas from provider address {}'.format(c.gas_provider())) default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending') nonce_generator = NonceOracle(c.gas_provider(), default_nonce) - nonce = nonce_generator.next() + nonce = nonce_generator.next(session=session) gas_price = c.gas_price() gas_limit = c.default_gas_limit refill_amount = c.refill_amount() @@ -449,7 +450,9 @@ def refill_gas(self, recipient_address, chain_str): tx_hash_hex, tx_send_gas_signed['raw'], chain_str, + session=session, ) + session.close() s_tx_cache = celery.signature( 'cic_eth.eth.tx.cache_gas_refill_data', @@ -492,8 +495,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa q = session.query(Otx) q = q.filter(Otx.tx_hash==txold_hash_hex) otx = q.first() - session.close() if otx == None: + session.close() raise NotLocalTxError(txold_hash_hex) chain_spec = ChainSpec.from_chain_str(chain_str) @@ -528,8 +531,10 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa tx_hash_hex, tx_signed_raw_hex, chain_str, + session=session, ) - TxCache.clone(txold_hash_hex, tx_hash_hex) + TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) + session.close() s = create_check_gas_and_send_task( [tx_signed_raw_hex], @@ -546,6 +551,13 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa @celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) def sync_tx(self, tx_hash_hex, chain_str): + """Force update of network status of a simgle transaction + + :param tx_hash_hex: Transaction hash + :type tx_hash_hex: str, 0x-hex + :param chain_str: Chain spec string representation + :type chain_str: str + """ queue = self.request.delivery_info['routing_key']