cic-internal-integration/apps/cic-eth/tests/task/test_task_erc20.py

170 lines
4.3 KiB
Python

# standard imports
import logging
# external imports
import pytest
import celery
from eth_erc20 import ERC20
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import (
receipt,
TxFormat,
)
# local imports
from cic_eth.queue.tx import register_tx
logg = logging.getLogger()
def test_otx_cache_transfer(
default_chain_spec,
foo_token,
token_roles,
agent_roles,
eth_signer,
eth_rpc,
init_database,
celery_session_worker,
):
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
transfer_value = 100 * (10**6)
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value, tx_format=TxFormat.RLP_SIGNED)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
s = celery.signature(
'cic_eth.eth.erc20.cache_transfer_data',
[
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r[0] == tx_hash_hex
def test_erc20_balance_task(
default_chain_spec,
foo_token,
token_roles,
agent_roles,
eth_signer,
eth_rpc,
celery_session_worker,
):
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
transfer_value = 100 * (10**6)
(tx_hash_hex, o) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
token_object = {
'address': foo_token,
}
s = celery.signature(
'cic_eth.eth.erc20.balance',
[
[token_object],
agent_roles['ALICE'],
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r[0]['balance_network'] == transfer_value
def test_erc20_transfer_task(
default_chain_spec,
foo_token,
agent_roles,
custodial_roles,
eth_signer,
eth_rpc,
init_database,
celery_session_worker,
):
token_object = {
'address': foo_token,
}
transfer_value = 100 * (10 ** 6)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
[token_object],
default_chain_spec.asdict(),
custodial_roles['FOO_TOKEN_GIFTER'],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.transfer',
[
custodial_roles['FOO_TOKEN_GIFTER'],
agent_roles['ALICE'],
transfer_value,
default_chain_spec.asdict(),
],
queue=None,
)
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
r = t.get_leaf()
logg.debug('result {}'.format(r))
def test_erc20_approve_task(
default_chain_spec,
foo_token,
agent_roles,
custodial_roles,
eth_signer,
eth_rpc,
init_database,
celery_session_worker,
):
token_object = {
'address': foo_token,
}
transfer_value = 100 * (10 ** 6)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
[token_object],
default_chain_spec.asdict(),
custodial_roles['FOO_TOKEN_GIFTER'],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.approve',
[
custodial_roles['FOO_TOKEN_GIFTER'],
agent_roles['ALICE'],
transfer_value,
default_chain_spec.asdict(),
],
queue=None,
)
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
r = t.get_leaf()
logg.debug('result {}'.format(r))