cic-internal-integration/apps/cic-eth/tests/task/api/test_app.py

172 lines
4.7 KiB
Python
Raw Normal View History

2021-02-01 18:12:51 +01:00
# standard imports
import os
import logging
import time
# external imports
2021-02-01 18:12:51 +01:00
import pytest
import celery
from cic_eth_registry.erc20 import ERC20Token
from chainlib.chain import ChainSpec
2021-05-31 17:34:16 +02:00
from eth_accounts_index import AccountsIndex
from chainlib.eth.tx import (
transaction,
)
2021-06-03 15:51:55 +02:00
from chainqueue.sql.state import (
2021-05-31 17:34:16 +02:00
set_reserved,
)
2021-02-01 18:12:51 +01:00
# local imports
2021-02-01 18:12:51 +01:00
from cic_eth.api import Api
2021-05-31 17:34:16 +02:00
from cic_eth.queue.query import get_tx
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
2021-02-01 18:12:51 +01:00
2021-02-01 18:12:51 +01:00
def test_account_api(
default_chain_spec,
init_database,
init_eth_rpc,
account_registry,
custodial_roles,
2021-02-01 18:12:51 +01:00
celery_session_worker,
):
api = Api(str(default_chain_spec), callback_param='accounts', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.create_account('', register=False)
t.get_leaf()
2021-02-01 18:12:51 +01:00
assert t.successful()
2021-05-31 17:34:16 +02:00
def test_account_api_register(
default_chain_spec,
init_database,
account_registry,
faucet,
custodial_roles,
cic_registry,
register_lookups,
eth_rpc,
celery_session_worker,
):
api = Api(str(default_chain_spec), callback_param='accounts', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.create_account('')
register_tx_hash = t.get_leaf()
assert t.successful()
set_reserved(default_chain_spec, register_tx_hash, session=init_database)
tx = get_tx(default_chain_spec.asdict(), register_tx_hash)
s = celery.signature(
'cic_eth.eth.tx.send',
[
[tx['signed_tx']],
default_chain_spec.asdict(),
],
queue=None
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
o = transaction(register_tx_hash)
tx_src = eth_rpc.do(o)
c = AccountsIndex(default_chain_spec)
address = c.parse_add_request(tx_src['data'])
o = c.have(account_registry, address[0], sender_address=custodial_roles['CONTRACT_DEPLOYER'])
r = eth_rpc.do(o)
assert c.parse_have(r)
2021-02-01 18:12:51 +01:00
def test_transfer_api(
default_chain_spec,
eth_rpc,
2021-02-01 18:12:51 +01:00
init_database,
foo_token,
custodial_roles,
agent_roles,
cic_registry,
2021-05-31 17:34:16 +02:00
token_registry,
2021-04-06 17:14:04 +02:00
register_lookups,
2021-02-01 18:12:51 +01:00
celery_session_worker,
2021-05-31 17:34:16 +02:00
register_tokens,
foo_token_symbol,
2021-02-01 18:12:51 +01:00
):
api = Api(str(default_chain_spec), callback_param='transfer', callback_task='cic_eth.callbacks.noop.noop', queue=None)
2021-05-31 17:34:16 +02:00
t = api.transfer(custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1, foo_token_symbol)
t.get_leaf()
2021-02-01 18:12:51 +01:00
assert t.successful()
2021-03-06 18:55:51 +01:00
@pytest.mark.skip()
2021-02-01 18:12:51 +01:00
def test_convert_api(
default_chain_spec,
init_w3,
cic_registry,
init_database,
foo_token,
bar_token,
2021-02-01 18:12:51 +01:00
celery_session_worker,
):
token_alice = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
token_bob = CICRegistry.get_address(default_chain_spec, bancor_tokens[1])
api = Api(str(default_chain_spec), callback_param='convert', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.convert(custodial_roles['FOO_TOKEN_GIFTER'], 110, 100, foo_token_cache.symbol, bar_token_cache.symbol)
t.get_leaf()
2021-02-01 18:12:51 +01:00
assert t.successful()
2021-03-06 18:55:51 +01:00
@pytest.mark.skip()
2021-02-01 18:12:51 +01:00
def test_convert_transfer_api(
default_chain_spec,
init_w3,
cic_registry,
init_database,
bancor_registry,
bancor_tokens,
celery_session_worker,
):
token_alice = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
token_bob = CICRegistry.get_address(default_chain_spec, bancor_tokens[1])
api = Api(str(default_chain_spec), callback_param='convert_transfer', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.convert_transfer(init_w3.eth.accounts[2], init_w3.eth.accounts[4], 110, 100, token_alice.symbol(), token_bob.symbol())
t.get()
for r in t.collect():
print(r)
assert t.successful()
def test_refill_gas(
default_chain_spec,
init_database,
eth_empty_accounts,
init_eth_rpc,
custodial_roles,
celery_session_worker,
2021-02-01 18:12:51 +01:00
):
api = Api(str(default_chain_spec), callback_param='refill_gas', callback_task='cic_eth.callbacks.noop.noop', queue=None)
2021-02-01 18:12:51 +01:00
t = api.refill_gas(eth_empty_accounts[0])
t.get()
for r in t.collect():
print(r)
assert t.successful()
def test_ping(
default_chain_spec,
celery_session_worker,
):
api = Api(str(default_chain_spec), callback_param='ping', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.ping('pong')
t.get()
for r in t.collect():
print(r)
assert t.successful()