cic-stack/apps/cic-eth/tests/task/api/test_list.py

133 lines
3.5 KiB
Python
Raw Permalink Normal View History

# standard imports
import logging
2021-04-04 14:40:59 +02:00
# external imports
import pytest
from chainlib.eth.nonce import RPCNonceOracle
from eth_erc20 import ERC20
from chainlib.eth.tx import receipt
2021-10-07 17:12:35 +02:00
from hexathon import strip_0x
2021-04-04 14:40:59 +02:00
# local imports
from cic_eth.api.api_task import Api
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
2021-04-04 14:40:59 +02:00
# test imports
2021-07-08 17:28:04 +02:00
from cic_eth.pytest.mock.filter import (
2021-04-04 14:40:59 +02:00
block_filter,
tx_filter,
)
logg = logging.getLogger()
def test_list_tx(
default_chain_spec,
init_database,
cic_registry,
eth_rpc,
eth_signer,
custodial_roles,
agent_roles,
foo_token,
register_tokens,
2021-10-07 17:12:35 +02:00
register_lookups,
init_eth_tester,
2021-04-25 16:54:54 +02:00
celery_session_worker,
2021-10-07 17:12:35 +02:00
init_celery_tasks,
):
tx_hashes = []
# external tx
nonce_oracle = RPCNonceOracle(custodial_roles['FOO_TOKEN_GIFTER'], eth_rpc)
nonce = nonce_oracle.get_nonce()
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==agent_roles['ALICE'])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
# TODO: implement cachenonceoracle instead, this is useless
# external tx one
Nonce.next(custodial_roles['FOO_TOKEN_GIFTER'], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
2021-04-04 14:40:59 +02:00
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
2021-10-07 17:12:35 +02:00
a = r['block_number']
2021-10-07 17:12:35 +02:00
ab = a.to_bytes(4, 'big')
block_filter.add(ab)
2021-10-07 17:12:35 +02:00
bb = r['transaction_index'].to_bytes(4, 'big')
cb = ab + bb
tx_filter.add(cb)
2021-10-07 17:12:35 +02:00
tx_hashes.append(strip_0x(tx_hash_hex))
# external tx two
Nonce.next(agent_roles['ALICE'], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
2021-04-04 14:40:59 +02:00
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 256)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
2021-10-07 17:12:35 +02:00
a = r['block_number']
2021-10-07 17:12:35 +02:00
ab = a.to_bytes(4, 'big')
block_filter.add(ab)
2021-10-07 17:12:35 +02:00
bb = r['transaction_index'].to_bytes(4, 'big')
cb = ab + bb
tx_filter.add(cb)
2021-10-07 17:12:35 +02:00
tx_hashes.append(strip_0x(tx_hash_hex))
init_eth_tester.mine_blocks(28)
# custodial tx 1
api = Api(str(default_chain_spec), queue=None)
2021-10-07 17:12:35 +02:00
t = api.transfer(agent_roles['ALICE'], agent_roles['CAROL'], 64, 'FOO')
r = t.get_leaf()
assert t.successful()
tx_hashes.append(r)
# custodial tx 2
api = Api(str(default_chain_spec), queue=None)
2021-10-07 17:12:35 +02:00
t = api.transfer(agent_roles['ALICE'], agent_roles['DAVE'], 16, 'FOO')
r = t.get_leaf()
assert t.successful()
tx_hashes.append(r)
logg.debug('r {}'.format(r))
# test the api
2021-07-08 17:28:04 +02:00
t = api.list(agent_roles['ALICE'], external_task='cic_eth.pytest.mock.filter.filter')
r = t.get_leaf()
assert t.successful()
assert len(r) == 3
logg.debug('rrrr {}'.format(r))
2021-10-07 17:12:35 +02:00
logg.debug('testing against hashes {}'.format(tx_hashes))
for tx in r:
logg.debug('have tx {}'.format(tx))
2021-10-07 17:12:35 +02:00
tx_hashes.remove(strip_0x(tx['hash']))
assert len(tx_hashes) == 1