diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index e64bcb1a..663f2259 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -529,82 +529,54 @@ def sync_tx(self, tx_hash_hex, chain_str): s.apply_async() - -@celery_app.task(bind=True) -def resume_tx(self, txpending_hash_hex, chain_str): - """Queue a suspended tranaction for (re)sending - - :param txpending_hash_hex: Transaction hash - :type txpending_hash_hex: str, 0x-hex - :param chain_str: Chain spec, string representation - :type chain_str: str - :raises NotLocalTxError: Transaction does not exist in the local queue - :returns: Transaction hash - :rtype: str, 0x-hex - """ - - chain_spec = ChainSpec.from_chain_str(chain_str) - - session = SessionBase.create_session() - q = session.query(Otx.signed_tx) - q = q.filter(Otx.tx_hash==txpending_hash_hex) - r = q.first() - session.close() - if r == None: - raise NotLocalTxError(txpending_hash_hex) - - tx_signed_raw_hex = r[0] - tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) - tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id()) - - queue = self.request.delivery_info['routing_key'] - - s = create_check_gas_and_send_task( - [tx_signed_raw_hex], - chain_str, - tx['from'], - tx['gasPrice'] * tx['gas'], - [txpending_hash_hex], - queue=queue, - ) - s.apply_async() - return txpending_hash_hex +# +#@celery_app.task(bind=True) +#def resume_tx(self, txpending_hash_hex, chain_str): +# """Queue a suspended tranaction for (re)sending +# +# :param txpending_hash_hex: Transaction hash +# :type txpending_hash_hex: str, 0x-hex +# :param chain_str: Chain spec, string representation +# :type chain_str: str +# :raises NotLocalTxError: Transaction does not exist in the local queue +# :returns: Transaction hash +# :rtype: str, 0x-hex +# """ +# +# chain_spec = ChainSpec.from_chain_str(chain_str) +# +# session = SessionBase.create_session() +# q = session.query(Otx.signed_tx) +# q = q.filter(Otx.tx_hash==txpending_hash_hex) +# r = q.first() +# session.close() +# if r == None: +# raise NotLocalTxError(txpending_hash_hex) +# +# tx_signed_raw_hex = r[0] +# tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) +# tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id()) +# +# queue = self.request.delivery_info['routing_key'] +# +# s = create_check_gas_and_send_task( +# [tx_signed_raw_hex], +# chain_str, +# tx['from'], +# tx['gasPrice'] * tx['gas'], +# [txpending_hash_hex], +# queue=queue, +# ) +# s.apply_async() +# return txpending_hash_hex @celery_app.task(base=CriticalSQLAlchemyTask) -def otx_cache_parse_tx( +def cache_gas_data( tx_hash_hex, tx_signed_raw_hex, chain_spec_dict, ): - """Generates and commits transaction cache metadata for a gas refill transaction - - :param tx_hash_hex: Transaction hash - :type tx_hash_hex: str, 0x-hex - :param tx_signed_raw_hex: Raw signed transaction - :type tx_signed_raw_hex: str, 0x-hex - :param chain_str: Chain spec string representation - :type chain_str: str - :returns: Transaction hash and id of cache element in storage backend, respectively - :rtype: tuple - """ - - - chain_spec = ChainSpec.from_dict(chain_spec_dict) - #c = RpcClient(chain_spec) - #tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) - tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) - tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) - (txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx, chain_spec) - return txc - - -#@celery_app.task(base=CriticalSQLAlchemyTask) -def cache_gas_refill_data( - tx_hash_hex, - tx, - chain_spec, - ): """Helper function for otx_cache_parse_tx :param tx_hash_hex: Transaction hash @@ -614,6 +586,10 @@ def cache_gas_refill_data( :returns: Transaction hash and id of cache element in storage backend, respectively :rtype: tuple """ + chain_spec = ChainSpec.from_dict(chain_spec_dict) + tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) + tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) + tx_cache = TxCache( tx_hash_hex, tx['from'], diff --git a/apps/cic-eth/tests/unit/queue/test_balances.py b/apps/cic-eth/tests/unit/queue/test_balances.py new file mode 100644 index 00000000..168401c9 --- /dev/null +++ b/apps/cic-eth/tests/unit/queue/test_balances.py @@ -0,0 +1,158 @@ +# standard imports +import os +import logging + +# third-party imports +import pytest + +# local imports +from cic_eth.db.models.otx import Otx +from cic_eth.db.models.tx import TxCache +from cic_eth.queue.balance import ( + balance_outgoing, + balance_incoming, + assemble_balances, + ) + +logg = logging.getLogger() + + +def test_assemble(): + + token_foo = '0x' + os.urandom(20).hex() + token_bar = '0x' + os.urandom(20).hex() + b = [ + [ + { + 'address': token_foo, + 'converters': [], + 'balance_foo': 42, + }, + { + 'address': token_bar, + 'converters': [], + 'balance_baz': 666, + }, + ], + [ + { + 'address': token_foo, + 'converters': [], + 'balance_bar': 13, + }, + + { + 'address': token_bar, + 'converters': [], + 'balance_xyzzy': 1337, + } + ] + ] + r = assemble_balances(b) + logg.debug('r {}'.format(r)) + + assert r[0]['address'] == token_foo + assert r[1]['address'] == token_bar + assert r[0].get('balance_foo') != None + assert r[0].get('balance_bar') != None + assert r[1].get('balance_baz') != None + assert r[1].get('balance_xyzzy') != None + + +@pytest.mark.skip() +def test_outgoing_balance( + default_chain_spec, + init_database, + ): + + chain_str = str(default_chain_spec) + recipient = '0x' + os.urandom(20).hex() + tx_hash = '0x' + os.urandom(32).hex() + signed_tx = '0x' + os.urandom(128).hex() + otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + init_database.add(otx) + init_database.commit() + + token_address = '0x' + os.urandom(20).hex() + sender = '0x' + os.urandom(20).hex() + txc = TxCache( + tx_hash, + sender, + recipient, + token_address, + token_address, + 1000, + 1000, + ) + init_database.add(txc) + init_database.commit() + + token_data = { + 'address': token_address, + 'converters': [], + } + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 1000 + + otx.sent(session=init_database) + init_database.commit() + + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 1000 + + otx.success(block=1024, session=init_database) + init_database.commit() + + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 0 + + +@pytest.mark.skip() +def test_incoming_balance( + default_chain_spec, + init_database, + ): + + chain_str = str(default_chain_spec) + recipient = '0x' + os.urandom(20).hex() + tx_hash = '0x' + os.urandom(32).hex() + signed_tx = '0x' + os.urandom(128).hex() + otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + init_database.add(otx) + init_database.commit() + + token_address = '0x' + os.urandom(20).hex() + sender = '0x' + os.urandom(20).hex() + txc = TxCache( + tx_hash, + sender, + recipient, + token_address, + token_address, + 1000, + 1000, + ) + init_database.add(txc) + init_database.commit() + + token_data = { + 'address': token_address, + 'converters': [], + } + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 0 + + otx.sent(session=init_database) + init_database.commit() + + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 1000 + + otx.success(block=1024, session=init_database) + init_database.commit() + + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 0 + + + diff --git a/apps/cic-eth/tests/unit/queue/test_list_tx.py b/apps/cic-eth/tests/unit/queue/test_list_tx.py new file mode 100644 index 00000000..397ed07e --- /dev/null +++ b/apps/cic-eth/tests/unit/queue/test_list_tx.py @@ -0,0 +1,76 @@ +# standard imports +import logging + +# external imports +from chainlib.connection import RPCConnection +from chainlib.eth.gas import RPCGasOracle +from chainlib.eth.nonce import RPCNonceOracle +from chainlib.eth.gas import Gas + +# local imports +from cic_eth.queue.tx import get_status_tx +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + ) +from cic_eth.queue.tx import create as queue_create +from cic_eth.eth.tx import cache_gas_data +from cic_eth.queue.tx import register_tx +from cic_eth.db.models.otx import Otx + +logg = logging.getLogger() + + +def test_status_tx_list( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + rpc = RPCConnection.connect(default_chain_spec, 'default') + + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + (tx_hash_hex, o) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024) + r = rpc.do(o) + + tx_signed_raw_hex = o['params'][0] + #queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + q = init_database.query(Otx) + otx = q.get(1) + otx.sendfail(session=init_database) + init_database.add(otx) + init_database.commit() + init_database.refresh(otx) + + txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 1 + + otx.sendfail(session=init_database) + otx.retry(session=init_database) + init_database.add(otx) + init_database.commit() + init_database.refresh(otx) + + txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.QUEUED, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 0 + + txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database) + assert len(txs) == 0 +