diff --git a/apps/cic-eth/cic_eth/admin/nonce.py b/apps/cic-eth/cic_eth/admin/nonce.py index a0d95cc9..7bef87fb 100644 --- a/apps/cic-eth/cic_eth/admin/nonce.py +++ b/apps/cic-eth/cic_eth/admin/nonce.py @@ -22,8 +22,7 @@ from cic_eth.queue.tx import ( ) from cic_eth.queue.tx import create as queue_create from cic_eth.eth.util import unpack_signed_raw_tx -from cic_eth.eth.task import ( - #sign_tx, +from cic_eth.eth.gas import ( create_check_gas_task, ) diff --git a/apps/cic-eth/cic_eth/db/models/nonce.py b/apps/cic-eth/cic_eth/db/models/nonce.py index 6cc1db91..37e69850 100644 --- a/apps/cic-eth/cic_eth/db/models/nonce.py +++ b/apps/cic-eth/cic_eth/db/models/nonce.py @@ -153,7 +153,7 @@ class NonceReservation(SessionBase): session = SessionBase.bind_session(session) - nonce = NonceReservation.peek(key, session=session) + nonce = NonceReservation.peek(address, key, session=session) q = session.query(NonceReservation) q = q.filter(NonceReservation.address_hex==address) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 58ff67b0..e22eb13f 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -21,11 +21,10 @@ from eth_accounts_index import AccountRegistry #from cic_eth.eth import RpcClient from cic_eth_registry import CICRegistry from cic_eth.eth import registry_extra_identifiers -from cic_eth.eth.task import ( - register_tx, +from cic_eth.eth.gas import ( create_check_gas_task, ) -from cic_eth.eth.factory import TxFactory +#from cic_eth.eth.factory import TxFactory from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.base import SessionBase from cic_eth.db.models.role import AccountRole @@ -40,6 +39,12 @@ from cic_eth.task import ( CriticalSQLAlchemyAndSignerTask, BaseTask, ) +from cic_eth.eth.nonce import ( + CustodialTaskNonceOracle, + ) +from cic_eth.queue.tx import ( + register_tx, + ) #logg = logging.getLogger(__name__) logg = logging.getLogger() @@ -48,73 +53,73 @@ celery_app = celery.current_app #celery_app.log.redirect_stdouts_to_logger(logg, loglevel=logging.DEBUG) -class AccountTxFactory(TxFactory): - """Factory for creating account index contract transactions - """ - def add( - self, - address, - chain_spec, - uuid, - session=None, - ): - """Register an Ethereum account address with the on-chain account registry - - :param address: Ethereum account address to add - :type address: str, 0x-hex - :param chain_spec: Chain to build transaction for - :type chain_spec: cic_registry.chain.ChainSpec - :returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format - :rtype: dict - """ - - c = self.registry.get_contract(chain_spec, 'AccountRegistry') - f = c.function('add') - tx_add_buildable = f( - address, - ) - gas = c.gas('add') - tx_add = tx_add_buildable.buildTransaction({ - 'from': self.address, - 'gas': gas, - 'gasPrice': self.gas_price, - 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(uuid, session=session), - 'value': 0, - }) - return tx_add - - - def gift( - self, - address, - chain_spec, - uuid, - session=None, - ): - """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account - - :param address: Ethereum account address to gift to - :type address: str, 0x-hex - :param chain_spec: Chain to build transaction for - :type chain_spec: cic_registry.chain.ChainSpec - :returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format - :rtype: dict - """ - - c = self.registry.get_contract(chain_spec, 'Faucet') - f = c.function('giveTo') - tx_add_buildable = f(address) - gas = c.gas('add') - tx_add = tx_add_buildable.buildTransaction({ - 'from': self.address, - 'gas': gas, - 'gasPrice': self.gas_price, - 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(uuid, session=session), - 'value': 0, - }) - return tx_add +#class AccountTxFactory(TxFactory): +# """Factory for creating account index contract transactions +# """ +# def add( +# self, +# address, +# chain_spec, +# uuid, +# session=None, +# ): +# """Register an Ethereum account address with the on-chain account registry +# +# :param address: Ethereum account address to add +# :type address: str, 0x-hex +# :param chain_spec: Chain to build transaction for +# :type chain_spec: cic_registry.chain.ChainSpec +# :returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format +# :rtype: dict +# """ +# +# c = self.registry.get_contract(chain_spec, 'AccountRegistry') +# f = c.function('add') +# tx_add_buildable = f( +# address, +# ) +# gas = c.gas('add') +# tx_add = tx_add_buildable.buildTransaction({ +# 'from': self.address, +# 'gas': gas, +# 'gasPrice': self.gas_price, +# 'chainId': chain_spec.chain_id(), +# 'nonce': self.next_nonce(uuid, session=session), +# 'value': 0, +# }) +# return tx_add +# +# +# def gift( +# self, +# address, +# chain_spec, +# uuid, +# session=None, +# ): +# """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account +# +# :param address: Ethereum account address to gift to +# :type address: str, 0x-hex +# :param chain_spec: Chain to build transaction for +# :type chain_spec: cic_registry.chain.ChainSpec +# :returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format +# :rtype: dict +# """ +# +# c = self.registry.get_contract(chain_spec, 'Faucet') +# f = c.function('giveTo') +# tx_add_buildable = f(address) +# gas = c.gas('add') +# tx_add = tx_add_buildable.buildTransaction({ +# 'from': self.address, +# 'gas': gas, +# 'gasPrice': self.gas_price, +# 'chainId': chain_spec.chain_id(), +# 'nonce': self.next_nonce(uuid, session=session), +# 'value': 0, +# }) +# return tx_add def unpack_register(data): @@ -231,8 +236,9 @@ def register(self, account_address, chain_spec_dict, writer_address=None): # Generate and sign transaction rpc_signer = RPCConnection.connect(chain_spec, 'signer') - nonce_oracle = self.create_nonce_oracle(writer_address, rpc) - gas_oracle = self.create_gas_oracle(rpc) + #nonce_oracle = self.create_nonce_oracle(writer_address, rpc) + nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce) + gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas) account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) (tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED) # TODO: if cache task fails, task chain will not return @@ -246,7 +252,8 @@ def register(self, account_address, chain_spec_dict, writer_address=None): #gas_budget = tx_add['gas'] * tx_add['gasPrice'] - gas_budget = account_registry.gas(tx_signed_raw_hex) + gas_pair = gas_oracle.get_gas(tx_signed_raw_hex) + gas_budget = gas_pair[0] * gas_pair[1] logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget)) s = create_check_gas_task( @@ -348,7 +355,7 @@ def cache_gift_data( self, tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, ): """Generates and commits transaction cache metadata for a Faucet.giveTo transaction @@ -394,7 +401,7 @@ def cache_account_data( self, tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, ): """Generates and commits transaction cache metadata for an AccountsIndex.add transaction @@ -408,11 +415,12 @@ def cache_account_data( :rtype: tuple """ - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + #c = RpcClient(chain_spec) + return tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) - tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) + #tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) + tx = unpack(tx_signed_raw_bytes, chain_id=chain_spec.chain_id()) tx_data = unpack_register(tx['data']) session = SessionBase.create_session() diff --git a/apps/cic-eth/cic_eth/eth/gas.py b/apps/cic-eth/cic_eth/eth/gas.py index 1be4dc6f..f53b1cb3 100644 --- a/apps/cic-eth/cic_eth/eth/gas.py +++ b/apps/cic-eth/cic_eth/eth/gas.py @@ -2,6 +2,7 @@ import logging # external imports +import celery from chainlib.eth.gas import price from hexathon import strip_0x @@ -82,3 +83,50 @@ class GasOracle(): #g = 100 #return g return self.gas_price_current + + +def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None): + """Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher. + + If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value. + + :param tx_signed_raws_hex: Raw signed transaction data + :type tx_signed_raws_hex: list of str, 0x-hex + :param chain_spec: Chain spec of address to add check gas for + :type chain_spec: chainlib.chain.ChainSpec + :param holder_address: Address sending the transactions + :type holder_address: str, 0x-hex + :param gas: Gas budget hint for transactions + :type gas: int + :param tx_hashes_hex: Transaction hashes + :type tx_hashes_hex: list of str, 0x-hex + :param queue: Task queue + :type queue: str + :returns: Signature of task chain + :rtype: celery.Signature + """ + s_check_gas = None + if tx_hashes_hex != None: + s_check_gas = celery.signature( + 'cic_eth.eth.tx.check_gas', + [ + tx_hashes_hex, + chain_spec.asdict(), + tx_signed_raws_hex, + holder_address, + gas, + ], + queue=queue, + ) + else: + s_check_gas = celery.signature( + 'cic_eth.eth.tx.check_gas', + [ + chain_spec.asdict(), + tx_signed_raws_hex, + holder_address, + gas, + ], + queue=queue, + ) + return s_check_gas diff --git a/apps/cic-eth/cic_eth/eth/nonce.py b/apps/cic-eth/cic_eth/eth/nonce.py index 79e00400..729d033c 100644 --- a/apps/cic-eth/cic_eth/eth/nonce.py +++ b/apps/cic-eth/cic_eth/eth/nonce.py @@ -4,7 +4,7 @@ from cic_eth.db.models.nonce import ( NonceReservation, ) -class NonceOracle(): +class CustodialTaskNonceOracle(): """Ensures atomic nonce increments for all transactions across all tasks and threads. :param address: Address to generate nonces for @@ -12,20 +12,20 @@ class NonceOracle(): :param default_nonce: Initial nonce value to use if no nonce cache entry already exists :type default_nonce: number """ - def __init__(self, address, default_nonce): + def __init__(self, address, uuid, session=None): self.address = address - self.default_nonce = default_nonce + self.uuid = uuid + self.session = session - def next(self): + def get_nonce(self): + return self.next_nonce() + + + def next_nonce(self): """Get next unique nonce. :returns: Nonce :rtype: number """ - raise AttributeError('this should not be called') - return Nonce.next(self.address, self.default_nonce) - - - def next_by_task_uuid(self, uuid, session=None): - return NonceReservation.release(uuid, session=session) + return NonceReservation.release(self.address, self.uuid, session=self.session) diff --git a/apps/cic-eth/cic_eth/eth/task.py b/apps/cic-eth/cic_eth/eth/task.py deleted file mode 100644 index 2dfeb300..00000000 --- a/apps/cic-eth/cic_eth/eth/task.py +++ /dev/null @@ -1,148 +0,0 @@ -# standard imports -import logging - -# external imports -import sha3 -import celery -from chainlib.chain import ChainSpec -from chainlib.eth.sign import sign_transaction -from chainlib.connection import RPCConnection -from chainlib.eth.tx import unpack -from hexathon import ( - strip_0x, - add_0x, - ) - -# local imports -from cic_eth.eth import RpcClient -from cic_eth.queue.tx import create as queue_create -from cic_eth.error import SignerError - -celery_app = celery.current_app -logg = celery_app.log.get_default_logger() - -# -#@celery_app.task() -#def sign_tx(tx, chain_str): -# """Sign a single transaction against the given chain specification. -# -# :param tx: Transaction in standard Ethereum format -# :type tx: dict -# :param chain_str: Chain spec string representation -# :type chain_str: str -# :returns: Transaction hash and raw signed transaction, respectively -# :rtype: tuple -# """ -# chain_spec = ChainSpec.from_chain_str(chain_str) -# #c = RpcClient(chain_spec) -# tx_transfer_signed = None -# conn = RPCConnection.connect('signer') -# try: -# o = sign_transaction(tx) -# tx_transfer_signed = conn.do(o) -# #try: -# # tx_transfer_signed = c.w3.eth.sign_transaction(tx) -# except Exception as e: -# raise SignerError('sign tx {}: {}'.format(tx, e)) -# logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed)) -# h = sha3.keccak_256() -# h.update(bytes.fromhex(strip_0x(tx_transfer_signed['raw']))) -# tx_hash = h.digest() -# #tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) -# tx_hash_hex = add_0x(tx_hash.hex()) -# return (tx_hash_hex, tx_transfer_signed['raw'],) - - -#def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None): -def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): - """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). - - :param tx: Standard ethereum transaction data - :type tx: dict - :param chain_spec: Chain spec of transaction to add to queue - :type chain_spec: chainlib.chain.ChainSpec - :param queue: Task queue - :type queue: str - :param cache_task: Cache task to call with signed transaction. If None, no task will be called. - :type cache_task: str - :raises: sqlalchemy.exc.DatabaseError - :returns: Tuple; Transaction hash, signed raw transaction data - :rtype: tuple - """ - #(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) - - logg.debug('adding queue tx {}'.format(tx_hash_hex)) - tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex)) - tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id()) - - queue_create( - tx['nonce'], - tx['from'], - tx_hash_hex, - tx_signed_raw_hex, - chain_spec, - session=session, - ) - - if cache_task != None: - logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex)) - s_cache = celery.signature( - cache_task, - [ - tx_hash_hex, - tx_signed_raw_hex, - chain_spec.asdict(), - ], - queue=queue, - ) - s_cache.apply_async() - - return (tx_hash_hex, tx_signed_raw_hex,) - - -# TODO: rename as we will not be sending task in the chain, this is the responsibility of the dispatcher -def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None): - """Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher. - - If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value. - - :param tx_signed_raws_hex: Raw signed transaction data - :type tx_signed_raws_hex: list of str, 0x-hex - :param chain_spec: Chain spec of address to add check gas for - :type chain_spec: chainlib.chain.ChainSpec - :param holder_address: Address sending the transactions - :type holder_address: str, 0x-hex - :param gas: Gas budget hint for transactions - :type gas: int - :param tx_hashes_hex: Transaction hashes - :type tx_hashes_hex: list of str, 0x-hex - :param queue: Task queue - :type queue: str - :returns: Signature of task chain - :rtype: celery.Signature - """ - s_check_gas = None - if tx_hashes_hex != None: - s_check_gas = celery.signature( - 'cic_eth.eth.tx.check_gas', - [ - tx_hashes_hex, - chain_spec.asdict(), - tx_signed_raws_hex, - holder_address, - gas, - ], - queue=queue, - ) - else: - s_check_gas = celery.signature( - 'cic_eth.eth.tx.check_gas', - [ - chain_spec.asdict(), - tx_signed_raws_hex, - holder_address, - gas, - ], - queue=queue, - ) - return s_check_gas diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 91732037..5a68d0e3 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -16,15 +16,27 @@ from chainlib.eth.tx import ( transaction, receipt, raw, + TxFormat, + unpack, ) from chainlib.connection import RPCConnection from chainlib.hash import keccak256_hex_to_hex -from hexathon import add_0x - +from chainlib.eth.gas import Gas +from chainlib.eth.contract import ( + abi_decode_single, + ABIContractType, + ) +from hexathon import ( + add_0x, + strip_0x, + ) # local imports from .rpc import RpcClient -from cic_eth.db import Otx, SessionBase +from cic_eth.db import ( + Otx, + SessionBase, + ) from cic_eth.db.models.tx import TxCache from cic_eth.db.models.nonce import NonceReservation from cic_eth.db.models.lock import Lock @@ -36,18 +48,18 @@ from cic_eth.db.enum import ( from cic_eth.error import PermanentTxError from cic_eth.error import TemporaryTxError from cic_eth.error import NotLocalTxError -from cic_eth.queue.tx import create as queue_create -from cic_eth.queue.tx import get_tx -from cic_eth.queue.tx import get_nonce_tx +#from cic_eth.queue.tx import create as queue_create +from cic_eth.queue.tx import ( + get_tx, + register_tx, + get_nonce_tx, + ) from cic_eth.error import OutOfGasError from cic_eth.error import LockedError -from cic_eth.eth.util import unpack_signed_raw_tx -from cic_eth.eth.task import ( - register_tx, +from cic_eth.eth.gas import ( create_check_gas_task, - #sign_tx, ) -from cic_eth.eth.nonce import NonceOracle +from cic_eth.eth.nonce import CustodialTaskNonceOracle from cic_eth.error import ( AlreadyFillingGasError, EthError, @@ -112,6 +124,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir try: o = balance(address) r = conn.do(o) + gas_balance = abi_decode_single(ABIContractType.UINT256, r) except EthException as e: raise EthError('gas_balance call for {}: {}'.format(address, e)) @@ -290,7 +303,7 @@ def send(self, txs, chain_spec_dict): ) s.apply_async() - return r.hex() + return tx_hash_hex # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. @@ -319,70 +332,60 @@ def refill_gas(self, recipient_address, chain_spec_dict): q = q.filter(TxCache.recipient==recipient_address) c = q.count() if c > 0: - #session.close() - #raise AlreadyFillingGasError(recipient_address) logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address)))) zero_amount = True session.flush() - queue = self.request.delivery_info['routing_key'] + queue = self.request.delivery_info.get('routing_key') - c = RpcClient(chain_spec) - conn = RPCConnection.connect() - logg.debug('refill gas from provider address {}'.format(c.gas_provider())) + #c = RpcClient(chain_spec) + rpc = RPCConnection.connect(chain_spec, 'default') + + gas_provider = AccountRole.get_address('GAS_GIFTER', session=session) + session.flush() # Get default nonce to use from network if no nonce has been set # TODO: This step may be redundant as nonce entry is set at account creation time #default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending') - o = count_pending(c.gas_provider()) - default_nonce = conn.do(o) + #o = count_pending(gas_provider) + #default_nonce = conn.do(o) - nonce_generator = NonceOracle(c.gas_provider(), default_nonce) + nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session) #, default_nonce) #nonce = nonce_generator.next(session=session) - nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session) - gas_price = c.gas_price() - gas_limit = c.default_gas_limit + #nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session) + rpc_signer = RPCConnection.connect(chain_spec, 'signer') + gas_oracle = self.create_gas_oracle(rpc) + c = Gas(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) + #gas_price = c.gas_price() + #gas_limit = c.default_gas_limit refill_amount = 0 if not zero_amount: - refill_amount = c.refill_amount() - logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce)) + refill_amount = self.safe_gas_refill_amount - # create and sign transaction - tx_send_gas = { - 'from': c.gas_provider(), - 'to': recipient_address, - 'gas': gas_limit, - 'gasPrice': gas_price, - 'chainId': chain_spec.chain_id(), - 'nonce': nonce, - 'value': refill_amount, - 'data': '', - } - #tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) - #tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) - #tx_hash_hex = tx_hash.hex() - (tx_hash_hex, tx_send_gas_signed) = sign_tx(tx_send_gas) + logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address)) +# # create and sign transaction +# tx_send_gas = { +# 'from': c.gas_provider(), +# 'to': recipient_address, +# 'gas': gas_limit, +# 'gasPrice': gas_price, +# 'chainId': chain_spec.chain_id(), +# 'nonce': nonce, +# 'value': refill_amount, +# 'data': '', +# } +# #tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) +# #tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) +# #tx_hash_hex = tx_hash.hex() +# (tx_hash_hex, tx_send_gas_signed) = sign_tx(tx_send_gas) + (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) # TODO: route this through sign_and_register_tx instead logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex)) - queue_create( - nonce, - c.gas_provider(), - tx_hash_hex, - tx_send_gas_signed['raw'], - chain_str, - session=session, - ) - session.close() + #cache_task = 'cic_eth.eth.tx.cache_gas_refill_data' + cache_task = 'cic_eth.eth.tx.otx_cache_parse_tx' + register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session) - s_tx_cache = celery.signature( - 'cic_eth.eth.tx.cache_gas_refill_data', - [ - tx_hash_hex, - tx_send_gas, - ], - queue=queue, - ) s_status = celery.signature( 'cic_eth.queue.tx.set_ready', [ @@ -390,9 +393,9 @@ def refill_gas(self, recipient_address, chain_spec_dict): ], queue=queue, ) - celery.group(s_tx_cache, s_status)() + t = s_status.apply_async() - return tx_send_gas_signed['raw'] + return tx_signed_raw_hex @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) @@ -413,7 +416,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa """ session = SessionBase.create_session() - q = session.query(Otx) q = q.filter(Otx.tx_hash==txold_hash_hex) otx = q.first() @@ -603,7 +605,7 @@ def resume_tx(self, txpending_hash_hex, chain_str): def otx_cache_parse_tx( tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec_dict, ): """Generates and commits transaction cache metadata for a gas refill transaction @@ -618,18 +620,20 @@ def otx_cache_parse_tx( """ - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) - tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) - tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) - (txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx) + chain_spec = ChainSpec.from_dict(chain_spec_dict) + #c = RpcClient(chain_spec) + #tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) + tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) + tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) + (txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx, chain_spec) return txc -@celery_app.task(base=CriticalSQLAlchemyTask) +#@celery_app.task(base=CriticalSQLAlchemyTask) def cache_gas_refill_data( tx_hash_hex, tx, + chain_spec, ): """Helper function for otx_cache_parse_tx diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 55bdba3a..3b44f10b 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -3,13 +3,14 @@ import logging import time import datetime -# third-party imports +# external imports import celery from hexathon import strip_0x from sqlalchemy import or_ from sqlalchemy import not_ from sqlalchemy import tuple_ from sqlalchemy import func +from chainlib.eth.tx import unpack # local imports from cic_eth.db.models.otx import Otx @@ -95,6 +96,50 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_prede return tx_hash +def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): + """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). + + :param tx: Standard ethereum transaction data + :type tx: dict + :param chain_spec: Chain spec of transaction to add to queue + :type chain_spec: chainlib.chain.ChainSpec + :param queue: Task queue + :type queue: str + :param cache_task: Cache task to call with signed transaction. If None, no task will be called. + :type cache_task: str + :raises: sqlalchemy.exc.DatabaseError + :returns: Tuple; Transaction hash, signed raw transaction data + :rtype: tuple + """ + logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex)) + tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex)) + tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id()) + + create( + tx['nonce'], + tx['from'], + tx_hash_hex, + tx_signed_raw_hex, + chain_spec, + session=session, + ) + + if cache_task != None: + logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex)) + s_cache = celery.signature( + cache_task, + [ + tx_hash_hex, + tx_signed_raw_hex, + chain_spec.asdict(), + ], + queue=queue, + ) + s_cache.apply_async() + + return (tx_hash_hex, tx_signed_raw_hex,) + + # TODO: Replace set_* with single task for set status @celery_app.task(base=CriticalSQLAlchemyTask) def set_sent_status(tx_hash, fail=False): diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index 5fd0d0e2..790950cc 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -53,6 +53,7 @@ class CriticalWeb3Task(CriticalTask): requests.exceptions.ConnectionError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 + safe_gas_refill_amount = safe_gas_threshold_amount * 5 class CriticalSQLAlchemyAndWeb3Task(CriticalTask): @@ -64,6 +65,8 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask): EthError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 + safe_gas_refill_amount = safe_gas_threshold_amount * 5 + class CriticalSQLAlchemyAndSignerTask(CriticalTask): autoretry_for = ( @@ -78,6 +81,8 @@ class CriticalWeb3AndSignerTask(CriticalTask): requests.exceptions.ConnectionError, SignerError, ) + safe_gas_threshold_amount = 2000000000 * 60000 * 3 + safe_gas_refill_amount = safe_gas_threshold_amount * 5 @celery_app.task(bind=True, base=BaseTask) diff --git a/apps/cic-eth/tests/conftest.py b/apps/cic-eth/tests/conftest.py index ca4fb9c6..a858a034 100644 --- a/apps/cic-eth/tests/conftest.py +++ b/apps/cic-eth/tests/conftest.py @@ -8,7 +8,6 @@ root_dir = os.path.dirname(script_dir) sys.path.insert(0, root_dir) from tests.fixtures_config import * -from tests.fixtures_celery import * from tests.fixtures_database import * from tests.fixtures_role import * from chainlib.eth.pytest import * diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index 45d22971..1ad05846 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -13,9 +13,9 @@ logg = logging.getLogger() def celery_includes(): return [ # 'cic_eth.eth.bancor', - 'cic_eth.eth.token', +# 'cic_eth.eth.token', 'cic_eth.eth.tx', - 'cic_eth.ext.tx', +# 'cic_eth.ext.tx', 'cic_eth.queue.tx', 'cic_eth.queue.balance', 'cic_eth.admin.ctrl', diff --git a/apps/cic-eth/tests/fixtures_role.py b/apps/cic-eth/tests/fixtures_role.py index 588af53f..6df64296 100644 --- a/apps/cic-eth/tests/fixtures_role.py +++ b/apps/cic-eth/tests/fixtures_role.py @@ -11,6 +11,7 @@ from cic_eth.db.models.role import AccountRole #logg = logging.getLogger(__name__) # what the actual fuck, debug is not being shown even though explicitly set +logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() @@ -29,7 +30,7 @@ def custodial_roles( for k in r.keys(): role = AccountRole.set(k, r[k]) init_database.add(role) - logg.info('adding role {} -> {}'.format(k, r[k])) + logg.error('adding role {} -> {}'.format(k, r[k])) init_database.commit() return r diff --git a/apps/cic-eth/tests/task/conftest.py b/apps/cic-eth/tests/task/conftest.py new file mode 100644 index 00000000..210aa07f --- /dev/null +++ b/apps/cic-eth/tests/task/conftest.py @@ -0,0 +1 @@ +from tests.fixtures_celery import * diff --git a/apps/cic-eth/tests/task/test_account.py b/apps/cic-eth/tests/task/test_account.py index dd55e699..11575202 100644 --- a/apps/cic-eth/tests/task/test_account.py +++ b/apps/cic-eth/tests/task/test_account.py @@ -18,7 +18,6 @@ from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import StatusEnum from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.role import AccountRole -from cic_eth.eth.account import AccountTxFactory logg = logging.getLogger() @@ -35,7 +34,7 @@ def test_create_account( 'cic_eth.eth.account.create', [ 'foo', - str(default_chain_spec), + default_chain_spec.asdict(), ], ) t = s.apply_async() @@ -52,7 +51,7 @@ def test_create_account( 'cic_eth.eth.account.have', [ r, - str(default_chain_spec), + default_chain_spec.asdict(), ], ) t = s.apply_async() @@ -73,8 +72,6 @@ def test_register_account( celery_worker, ): - logg.debug('chainspec {}'.format(str(default_chain_spec))) - s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ @@ -95,7 +92,7 @@ def test_register_account( t = s_nonce.apply_async() address = t.get() for r in t.collect(): - pass + logg.debug('r {}'.format(r)) assert t.successful() session = SessionBase.create_session()