From 964952e9040affe17c550bc180c8110c81aae986 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 20 Mar 2021 21:10:19 +0100 Subject: [PATCH] Rehabilitate queue tests --- apps/cic-eth/cic_eth/admin/ctrl.py | 21 +- apps/cic-eth/cic_eth/queue/tx.py | 3 +- .../cic-eth/tests/unit/queue/test_tx_queue.py | 870 +++++++++--------- apps/cic-eth/tests/util/nonce.py | 12 + 4 files changed, 485 insertions(+), 421 deletions(-) create mode 100644 apps/cic-eth/tests/util/nonce.py diff --git a/apps/cic-eth/cic_eth/admin/ctrl.py b/apps/cic-eth/cic_eth/admin/ctrl.py index d7008592..0edeab0b 100644 --- a/apps/cic-eth/cic_eth/admin/ctrl.py +++ b/apps/cic-eth/cic_eth/admin/ctrl.py @@ -19,7 +19,7 @@ celery_app = celery.current_app logg = logging.getLogger() @celery_app.task(base=CriticalSQLAlchemyTask) -def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None): +def lock(chained_input, chain_spec_dict, address=zero_address, flags=LockEnum.ALL, tx_hash=None): """Task wrapper to set arbitrary locks :param chain_str: Chain spec string representation @@ -31,13 +31,14 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_ :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash) logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL): +def unlock(chained_input, chain_spec_dict, address=zero_address, flags=LockEnum.ALL): """Task wrapper to reset arbitrary locks :param chain_str: Chain spec string representation @@ -49,13 +50,14 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL): :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.reset(chain_str, flags, address=address) logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None): +def lock_send(chained_input, chain_spec_dict, address=zero_address, tx_hash=None): """Task wrapper to set send lock :param chain_str: Chain spec string representation @@ -65,13 +67,14 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None): :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash) logg.debug('Send locked for {}, flag now {}'.format(address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def unlock_send(chained_input, chain_str, address=zero_address): +def unlock_send(chained_input, chain_spec_dict, address=zero_address): """Task wrapper to reset send lock :param chain_str: Chain spec string representation @@ -81,13 +84,14 @@ def unlock_send(chained_input, chain_str, address=zero_address): :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.reset(chain_str, LockEnum.SEND, address=address) logg.debug('Send unlocked for {}, flag now {}'.format(address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None): +def lock_queue(chained_input, chain_spec_dict, address=zero_address, tx_hash=None): """Task wrapper to set queue direct lock :param chain_str: Chain spec string representation @@ -97,13 +101,14 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None): :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash) logg.debug('Queue direct locked for {}, flag now {}'.format(address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def unlock_queue(chained_input, chain_str, address=zero_address): +def unlock_queue(chained_input, chain_spec_dict, address=zero_address): """Task wrapper to reset queue direct lock :param chain_str: Chain spec string representation @@ -113,13 +118,15 @@ def unlock_queue(chained_input, chain_str, address=zero_address): :returns: New lock state for address :rtype: number """ + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) r = Lock.reset(chain_str, LockEnum.QUEUE, address=address) logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r)) return chained_input @celery_app.task(base=CriticalSQLAlchemyTask) -def check_lock(chained_input, chain_str, lock_flags, address=None): +def check_lock(chained_input, chain_spec_dict, lock_flags, address=None): + chain_str = str(ChainSpec.from_dict(chain_spec_dict)) session = SessionBase.create_session() r = Lock.check(chain_str, lock_flags, address=zero_address, session=session) if address != None: diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 3b44f10b..daf34d8e 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -68,13 +68,12 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_prede session.flush() if obsolete_predecessors: - # TODO: obsolete previous txs from same holder with same nonce q = session.query(Otx) q = q.join(TxCache) q = q.filter(Otx.nonce==nonce) q = q.filter(TxCache.sender==holder_address) q = q.filter(Otx.tx_hash!=tx_hash) - q = q.filter(Otx.status<=StatusEnum.SENT) + q = q.filter(Otx.status.op('&')(StatusBits.FINAL)==0) for otx in q.all(): logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) diff --git a/apps/cic-eth/tests/unit/queue/test_tx_queue.py b/apps/cic-eth/tests/unit/queue/test_tx_queue.py index 4102439a..385b3fb5 100644 --- a/apps/cic-eth/tests/unit/queue/test_tx_queue.py +++ b/apps/cic-eth/tests/unit/queue/test_tx_queue.py @@ -3,16 +3,18 @@ import datetime import os import logging -# third-party imports +# external imports import pytest from sqlalchemy import DateTime -#from cic_registry import CICRegistry from chainlib.connection import RPCConnection from chainlib.eth.nonce import OverrideNonceOracle +from chainlib.eth.tx import unpack from chainlib.eth.gas import ( RPCGasOracle, Gas, ) +from chainlib.eth.constant import ZERO_ADDRESS +from hexathon import strip_0x # local imports from cic_eth.eth.tx import cache_gas_data @@ -39,7 +41,10 @@ from cic_eth.queue.tx import get_upcoming_tx from cic_eth.queue.tx import get_account_tx from cic_eth.queue.tx import get_tx from cic_eth.db.error import TxStateChangeError -from cic_eth.queue.tx import registry_tx +from cic_eth.queue.tx import register_tx + +# test imports +from tests.util.nonce import StaticNonceOracle logg = logging.getLogger() @@ -53,7 +58,7 @@ def test_finalize( ): rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 0) + nonce_oracle = StaticNonceOracle(0) gas_oracle = RPCGasOracle(eth_rpc) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) @@ -64,7 +69,7 @@ def test_finalize( c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)), ] - nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 1) + nonce_oracle = StaticNonceOracle(1) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6))) @@ -80,8 +85,8 @@ def test_finalize( tx_hashes.append(tx_hash_hex) - if i < 4: - set_sent_status(tx_hash.hex()) + if i < 3: + set_sent_status(tx_hash_hex) i += 1 @@ -124,408 +129,449 @@ def test_finalize( assert not is_error_status(otx.status) -#def test_expired( -# default_chain_spec, -# init_database, -# init_w3, -# ): -# -# tx_hashes = [] -# for i in range(1, 6): -# tx = { -# 'from': init_w3.eth.accounts[0], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 42 + int(i/2), -# 'gas': 21000, -# 'gasPrice': 1000000*i, -# 'value': 128, -# 'chainId': 666, -# 'data': '0x', -# } -# tx_signed = init_w3.eth.sign_transaction(tx) -# #tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw']) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# tx_hashes.append(tx_hash.hex()) -# set_sent_status(tx_hash.hex(), False) -# otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash.hex()).first() -# fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i) -# otx.date_created = fake_created -# init_database.add(otx) -# init_database.commit() -# init_database.refresh(otx) -# -# now = datetime.datetime.utcnow() -# delta = datetime.timedelta(seconds=61) -# then = now - delta -# -# otxs = OtxSync.get_expired(then) -# nonce_acc = 0 -# for otx in otxs: -# nonce_acc += otx.nonce -# -# assert nonce_acc == (43 + 44) -# -# -#def test_get_paused( -# init_w3, -# init_database, -# cic_registry, -# default_chain_spec, -# ): -# -# tx_hashes = [] -# for i in range(1, 3): -# tx = { -# 'from': init_w3.eth.accounts[0], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 42 + int(i), -# 'gas': 21000, -# 'gasPrice': 1000000*i, -# 'value': 128, -# 'chainId': 8995, -# 'data': '0x', -# } -# logg.debug('nonce {}'.format(tx['nonce'])) -# tx_signed = init_w3.eth.sign_transaction(tx) -# #tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw']) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# tx_hashes.append(tx_hash.hex()) -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1]) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0]) -# assert len(txs.keys()) == 0 -# -# q = init_database.query(Otx) -# q = q.filter(Otx.tx_hash==tx_hashes[0]) -# r = q.first() -# r.waitforgas(session=init_database) -# init_database.add(r) -# init_database.commit() -# -# chain_id = default_chain_spec.chain_id() -# txs = get_paused_txs(chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1]) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# txs = get_paused_txs(status=StatusEnum.WAITFORGAS) -# assert len(txs.keys()) == 1 -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# -# q = init_database.query(Otx) -# q = q.filter(Otx.tx_hash==tx_hashes[1]) -# o = q.first() -# o.waitforgas(session=init_database) -# init_database.add(o) -# init_database.commit() -# -# txs = get_paused_txs() -# assert len(txs.keys()) == 2 -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1]) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) -# assert len(txs.keys()) == 2 -# -# txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) -# assert len(txs.keys()) == 2 -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id) -# assert len(txs.keys()) == 2 -# -# -# q = init_database.query(Otx) -# q = q.filter(Otx.tx_hash==tx_hashes[1]) -# o = q.first() -# o.sendfail(session=init_database) -# init_database.add(o) -# init_database.commit() -# -# txs = get_paused_txs() -# assert len(txs.keys()) == 2 -# -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) -# assert len(txs.keys()) == 2 -# -# txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) -# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# -#def test_get_upcoming( -# default_chain_spec, -# init_w3, -# init_database, -# cic_registry, -# ): -# -# tx_hashes = [] -# for i in range(0, 7): -# tx = { -# 'from': init_w3.eth.accounts[i % 3], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 42 + int(i / 3), -# 'gas': 21000, -# 'gasPrice': 1000000*i, -# 'value': 128, -# 'chainId': 8995, -# 'data': '0x', -# } -# tx_signed = init_w3.eth.sign_transaction(tx) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# logg.debug('{} nonce {} {}'.format(i, tx['nonce'], tx_hash.hex())) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# tx_hashes.append(tx_hash.hex()) -# -# chain_id = int(default_chain_spec.chain_id()) -# -# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) -# assert len(txs.keys()) == 3 -# -# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[0]][2:]), chain_id) -# assert tx['nonce'] == 42 -# -# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[1]][2:]), chain_id) -# assert tx['nonce'] == 42 -# -# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[2]][2:]), chain_id) -# assert tx['nonce'] == 42 -# -# q = init_database.query(TxCache) -# q = q.filter(TxCache.sender==init_w3.eth.accounts[0]) -# for o in q.all(): -# o.date_checked -= datetime.timedelta(seconds=30) -# init_database.add(o) -# init_database.commit() -# -# before = datetime.datetime.now() - datetime.timedelta(seconds=20) -# logg.debug('before {}'.format(before)) -# txs = get_upcoming_tx(StatusEnum.PENDING, before=before) -# logg.debug('txs {} {}'.format(txs.keys(), txs.values())) -# assert len(txs.keys()) == 1 -# -# # Now date checked has been set to current time, and the check returns no results -# txs = get_upcoming_tx(StatusEnum.PENDING, before=before) -# logg.debug('txs {} {}'.format(txs.keys(), txs.values())) -# assert len(txs.keys()) == 0 -# -# set_sent_status(tx_hashes[0]) -# -# txs = get_upcoming_tx(StatusEnum.PENDING) -# assert len(txs.keys()) == 3 -# with pytest.raises(KeyError): -# tx = txs[tx_hashes[0]] -# -# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[3]][2:]), chain_id) -# assert tx['nonce'] == 43 -# -# set_waitforgas(tx_hashes[1]) -# txs = get_upcoming_tx(StatusEnum.PENDING) -# assert len(txs.keys()) == 3 -# with pytest.raises(KeyError): -# tx = txs[tx_hashes[1]] -# -# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[3]][2:]), chain_id) -# assert tx['nonce'] == 43 -# -# -# txs = get_upcoming_tx(StatusEnum.WAITFORGAS) -# assert len(txs.keys()) == 1 -# -# -#def test_upcoming_with_lock( -# default_chain_spec, -# init_database, -# init_w3, -# ): -# -# chain_id = int(default_chain_spec.chain_id()) -# chain_str = str(default_chain_spec) -# -# tx = { -# 'from': init_w3.eth.accounts[0], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 42, -# 'gas': 21000, -# 'gasPrice': 1000000, -# 'value': 128, -# 'chainId': 8995, -# 'data': '0x', -# } -# tx_signed = init_w3.eth.sign_transaction(tx) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# logg.debug('nonce {} {}'.format(tx['nonce'], tx_hash.hex())) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# -# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# Lock.set(chain_str, LockEnum.SEND, address=init_w3.eth.accounts[0]) -# -# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) -# assert len(txs.keys()) == 0 -# -# tx = { -# 'from': init_w3.eth.accounts[1], -# 'to': init_w3.eth.accounts[0], -# 'nonce': 42, -# 'gas': 21000, -# 'gasPrice': 1000000, -# 'value': 128, -# 'chainId': 8995, -# 'data': '0x', -# } -# tx_signed = init_w3.eth.sign_transaction(tx) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# logg.debug('nonce {} {}'.format(tx['nonce'], tx_hash.hex())) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# -# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) -# assert len(txs.keys()) == 1 -# -# -#def test_obsoletion( -# default_chain_spec, -# init_w3, -# init_database, -# ): -# -# tx_hashes = [] -# for i in range(0, 4): -# tx = { -# 'from': init_w3.eth.accounts[int(i/2)], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 42 + int(i/3), -# 'gas': 21000, -# 'gasPrice': 1000000*i, -# 'value': 128, -# 'chainId': 8995, -# 'data': '0x', -# } -# -# logg.debug('nonce {}'.format(tx['nonce'])) -# tx_signed = init_w3.eth.sign_transaction(tx) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# tx_hashes.append(tx_hash.hex()) -# -# if i < 2: -# set_sent_status(tx_hash.hex()) -# -# session = SessionBase.create_session() -# q = session.query(Otx) -# q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value) -# z = 0 -# for o in q.all(): -# z += o.nonce -# -# session.close() -# assert z == 42 -# -# set_final_status(tx_hashes[1], 362436, True) -# -# session = SessionBase.create_session() -# q = session.query(Otx) -# q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value) -# zo = 0 -# for o in q.all(): -# zo += o.nonce -# -# q = session.query(Otx) -# q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value) -# zc = 0 -# for o in q.all(): -# zc += o.nonce -# -# session.close() -# assert zo == 0 -# assert zc == 42 -# -# -#def test_retry( -# init_database, -# ): -# -# address = '0x' + os.urandom(20).hex() -# tx_hash = '0x' + os.urandom(32).hex() -# signed_tx = '0x' + os.urandom(128).hex() -# otx = Otx(0, address, tx_hash, signed_tx) -# init_database.add(otx) -# init_database.commit() -# -# set_sent_status(tx_hash, True) -# set_ready(tx_hash) -# -# q = init_database.query(Otx) -# q = q.filter(Otx.tx_hash==tx_hash) -# otx = q.first() -# -# assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value -# assert is_error_status(otx.status) -# -# set_sent_status(tx_hash, False) -# set_ready(tx_hash) -# -# init_database.commit() -# -# q = init_database.query(Otx) -# q = q.filter(Otx.tx_hash==tx_hash) -# otx = q.first() -# -# assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value -# assert not is_error_status(otx.status) -# -# -#def test_get_account_tx( -# default_chain_spec, -# init_database, -# init_w3, -# ): -# -# tx_hashes = [] -# -# for i in range(0, 4): -# -# tx = { -# 'from': init_w3.eth.accounts[int(i/3)], -# 'to': init_w3.eth.accounts[3-i], -# 'nonce': 42 + i, -# 'gas': 21000, -# 'gasPrice': 1000000*i, -# 'value': 128, -# 'chainId': 666, -# 'data': '', -# } -# logg.debug('nonce {}'.format(tx['nonce'])) -# tx_signed = init_w3.eth.sign_transaction(tx) -# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) -# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) -# cache_gas_refill_data(tx_hash.hex(), tx) -# tx_hashes.append(tx_hash.hex()) -# -# txs = get_account_tx(init_w3.eth.accounts[0]) -# logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes)) -# assert list(txs.keys()) == tx_hashes -# -# txs = get_account_tx(init_w3.eth.accounts[0], as_recipient=False) -# assert list(txs.keys()) == tx_hashes[:3] -# -# txs = get_account_tx(init_w3.eth.accounts[0], as_sender=False) -# assert list(txs.keys()) == tx_hashes[3:] +def test_expired( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = StaticNonceOracle(42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + txs_rpc = [ + c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)), + ] + + nonce_oracle = StaticNonceOracle(43) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + txs_rpc += [ + c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)), + ] + + nonce_oracle = StaticNonceOracle(44) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6))) + + tx_hashes = [] + + i = 0 + for entry in txs_rpc: + tx_hash_hex = entry[0] + tx_rpc = entry[1] + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + tx_hashes.append(tx_hash_hex) + + set_sent_status(tx_hash_hex, False) + + otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() + fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i) + otx.date_created = fake_created + init_database.add(otx) + init_database.commit() + init_database.refresh(otx) + + i += 1 + + now = datetime.datetime.utcnow() + delta = datetime.timedelta(seconds=61) + then = now - delta + + otxs = OtxSync.get_expired(then) + nonce_acc = 0 + for otx in otxs: + nonce_acc += otx.nonce + + assert nonce_acc == (43 + 44) + + +def test_get_paused( + init_database, + default_chain_spec, + eth_rpc, + eth_signer, + agent_roles, + ): + + chain_id = default_chain_spec.chain_id() + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + txs_rpc = [ + c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)), + ] + + tx_hashes = [] + for entry in txs_rpc: + tx_hash_hex = entry[0] + tx_rpc = entry[1] + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + tx_hashes.append(tx_hash_hex) + + txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) + assert len(txs.keys()) == 0 + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==tx_hashes[0]) + r = q.first() + r.waitforgas(session=init_database) + init_database.add(r) + init_database.commit() + + chain_id = default_chain_spec.chain_id() + txs = get_paused_txs(chain_id=chain_id) + assert len(txs.keys()) == 1 + + txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) + assert len(txs.keys()) == 1 + + txs = get_paused_txs(status=StatusBits.GAS_ISSUES) + assert len(txs.keys()) == 1 + + txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) + assert len(txs.keys()) == 1 + + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==tx_hashes[1]) + o = q.first() + o.waitforgas(session=init_database) + init_database.add(o) + init_database.commit() + + txs = get_paused_txs() + assert len(txs.keys()) == 2 + + txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) + assert len(txs.keys()) == 2 + + txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id) + assert len(txs.keys()) == 2 + + txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0]) + assert len(txs.keys()) == 2 + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==tx_hashes[1]) + o = q.first() + o.sendfail(session=init_database) + init_database.add(o) + init_database.commit() + + txs = get_paused_txs() + assert len(txs.keys()) == 2 + + txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) + assert len(txs.keys()) == 2 + + txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id) + txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) + assert len(txs.keys()) == 1 + + txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0]) + assert len(txs.keys()) == 1 + + +def test_get_upcoming( + default_chain_spec, + eth_rpc, + eth_signer, + init_database, + agent_roles, + ): + + chain_id = default_chain_spec.chain_id() + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = StaticNonceOracle(42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + txs_rpc = [ + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), + c.create(agent_roles['BOB'], agent_roles['DAVE'], 200 * (10 ** 6)), + c.create(agent_roles['CAROL'], agent_roles['DAVE'], 300 * (10 ** 6)), + ] + + nonce_oracle = StaticNonceOracle(43) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + txs_rpc += [ + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 400 * (10 ** 6)), + c.create(agent_roles['BOB'], agent_roles['DAVE'], 500 * (10 ** 6)), + c.create(agent_roles['CAROL'], agent_roles['DAVE'], 600 * (10 ** 6)), + ] + + nonce_oracle = StaticNonceOracle(44) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + txs_rpc += [ + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 700 * (10 ** 6)), + ] + + tx_hashes = [] + for entry in txs_rpc: + tx_hash_hex = entry[0] + tx_rpc = entry[1] + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + tx_hashes.append(tx_hash_hex) + + set_ready(tx_hash_hex) + + txs = get_upcoming_tx(StatusBits.QUEUED, chain_id=chain_id) + assert len(txs.keys()) == 3 + + tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[0]])), chain_id) + assert tx['nonce'] == 42 + + tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[1]])), chain_id) + assert tx['nonce'] == 42 + + tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[2]])), chain_id) + assert tx['nonce'] == 42 + + q = init_database.query(TxCache) + q = q.filter(TxCache.sender==agent_roles['ALICE']) + for o in q.all(): + o.date_checked -= datetime.timedelta(seconds=30) + init_database.add(o) + init_database.commit() + + before = datetime.datetime.now() - datetime.timedelta(seconds=20) + logg.debug('before {}'.format(before)) + txs = get_upcoming_tx(StatusBits.QUEUED, before=before) + logg.debug('txs {} {}'.format(txs.keys(), txs.values())) + assert len(txs.keys()) == 1 + + # Now date checked has been set to current time, and the check returns no results + txs = get_upcoming_tx(StatusBits.QUEUED, before=before) + logg.debug('txs {} {}'.format(txs.keys(), txs.values())) + assert len(txs.keys()) == 0 + + set_sent_status(tx_hashes[0]) + + txs = get_upcoming_tx(StatusBits.QUEUED) + assert len(txs.keys()) == 3 + with pytest.raises(KeyError): + tx = txs[tx_hashes[0]] + + tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id) + assert tx['nonce'] == 43 + + set_waitforgas(tx_hashes[1]) + txs = get_upcoming_tx(StatusBits.QUEUED) + assert len(txs.keys()) == 3 + with pytest.raises(KeyError): + tx = txs[tx_hashes[1]] + + tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id) + assert tx['nonce'] == 43 + + txs = get_upcoming_tx(StatusBits.GAS_ISSUES) + assert len(txs.keys()) == 1 + + +def test_upcoming_with_lock( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + chain_id = int(default_chain_spec.chain_id()) + + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = StaticNonceOracle(42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)) + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) + assert len(txs.keys()) == 1 + + Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE']) + + txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) + assert len(txs.keys()) == 0 + + (tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6)) + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) + assert len(txs.keys()) == 1 + + +def test_obsoletion( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + chain_id = default_chain_spec.chain_id() + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = StaticNonceOracle(42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + txs_rpc = [ + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 200 * (10 ** 6)), + c.create(agent_roles['BOB'], agent_roles['DAVE'], 300 * (10 ** 6)), + ] + + nonce_oracle = StaticNonceOracle(43) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + txs_rpc += [ + c.create(agent_roles['BOB'], agent_roles['DAVE'], 400 * (10 ** 6)), + ] + + tx_hashes = [] + i = 0 + for entry in txs_rpc: + tx_hash_hex = entry[0] + tx_rpc = entry[1] + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + tx_hashes.append(tx_hash_hex) + + if i < 2: + set_sent_status(tx_hash_hex) + + i += 1 + + session = SessionBase.create_session() + q = session.query(Otx) + q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value) + z = 0 + for o in q.all(): + z += o.nonce + + session.close() + assert z == 42 + + set_final_status(tx_hashes[1], 1023, True) + + session = SessionBase.create_session() + q = session.query(Otx) + q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value) + zo = 0 + for o in q.all(): + zo += o.nonce + + q = session.query(Otx) + q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value) + zc = 0 + for o in q.all(): + zc += o.nonce + + session.close() + assert zo == 0 + assert zc == 42 + + +def test_retry( + init_database, + ): + + address = '0x' + os.urandom(20).hex() + tx_hash = '0x' + os.urandom(32).hex() + signed_tx = '0x' + os.urandom(128).hex() + otx = Otx(0, address, tx_hash, signed_tx) + init_database.add(otx) + init_database.commit() + + set_sent_status(tx_hash, True) + set_ready(tx_hash) + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + otx = q.first() + + assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value + assert is_error_status(otx.status) + + set_sent_status(tx_hash, False) + set_ready(tx_hash) + + init_database.commit() + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + otx = q.first() + + assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value + assert not is_error_status(otx.status) + + +def test_get_account_tx( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + chain_id = default_chain_spec.chain_id() + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = OverrideNonceOracle(ZERO_ADDRESS, 42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + txs_rpc = [ + c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['CAROL'], 200 * (10 ** 6)), + c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)), + c.create(agent_roles['BOB'], agent_roles['ALICE'], 300 * (10 ** 6)), + ] + + tx_hashes = [] + for entry in txs_rpc: + tx_hash_hex = entry[0] + tx_rpc = entry[1] + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + tx_hashes.append(tx_hash_hex) + + txs = get_account_tx(agent_roles['ALICE']) + logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes)) + assert list(txs.keys()) == tx_hashes + + txs = get_account_tx(agent_roles['ALICE'], as_recipient=False) + assert list(txs.keys()) == tx_hashes[:3] + + txs = get_account_tx(agent_roles['ALICE'], as_sender=False) + assert list(txs.keys()) == tx_hashes[3:] diff --git a/apps/cic-eth/tests/util/nonce.py b/apps/cic-eth/tests/util/nonce.py new file mode 100644 index 00000000..c1466bfc --- /dev/null +++ b/apps/cic-eth/tests/util/nonce.py @@ -0,0 +1,12 @@ +# external imports +from chainlib.eth.nonce import OverrideNonceOracle +from chainlib.eth.constant import ZERO_ADDRESS + + +class StaticNonceOracle(OverrideNonceOracle): + + def __init__(self, nonce): + super(StaticNonceOracle, self).__init__(ZERO_ADDRESS, nonce) + + def next_nonce(self): + return self.nonce