@@ -9,6 +9,7 @@ sys.path.insert(0, root_dir)
|
||||
|
||||
from tests.fixtures_config import *
|
||||
from tests.fixtures_database import *
|
||||
from tests.fixtures_celery import *
|
||||
from tests.fixtures_role import *
|
||||
from chainlib.eth.pytest import *
|
||||
from contract_registry.pytest import *
|
||||
|
||||
@@ -11,6 +11,7 @@ logg = logging.getLogger()
|
||||
# celery fixtures
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_includes():
|
||||
logg.debug('foooooooooooo ')
|
||||
return [
|
||||
# 'cic_eth.eth.bancor',
|
||||
'cic_eth.eth.erc20',
|
||||
|
||||
@@ -18,14 +18,15 @@ logg = logging.getLogger()
|
||||
@pytest.fixture(scope='function')
|
||||
def custodial_roles(
|
||||
contract_roles,
|
||||
token_roles,
|
||||
eth_accounts,
|
||||
init_database,
|
||||
):
|
||||
r = {}
|
||||
r.update(contract_roles)
|
||||
r.update({
|
||||
'DEFAULT': eth_accounts[0],
|
||||
'GAS_GIFTER': eth_accounts[3],
|
||||
'GAS_GIFTER': eth_accounts[10],
|
||||
'FOO_TOKEN_GIFTER': token_roles['FOO_TOKEN_OWNER'],
|
||||
})
|
||||
for k in r.keys():
|
||||
role = AccountRole.set(k, r[k])
|
||||
|
||||
@@ -1,22 +1,65 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
import pytest
|
||||
import celery
|
||||
from chainlib.eth.erc20 import ERC20
|
||||
from chainlib.eth.nonce import RPCNonceOracle
|
||||
from chainlib.eth.tx import receipt
|
||||
from chainlib.eth.tx import (
|
||||
receipt,
|
||||
TxFormat,
|
||||
)
|
||||
|
||||
# local imports
|
||||
from cic_eth.queue.tx import register_tx
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_erc20_balance(
|
||||
def test_otx_cache_transfer(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
token_roles,
|
||||
agent_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
celery_worker,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
):
|
||||
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
|
||||
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id())
|
||||
transfer_value = 100 * (10**6)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value, tx_format=TxFormat.RLP_SIGNED)
|
||||
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.erc20.cache_transfer_data',
|
||||
[
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
default_chain_spec.asdict(),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
|
||||
assert r[0] == tx_hash_hex
|
||||
|
||||
|
||||
def test_erc20_balance_task(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
token_roles,
|
||||
agent_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
|
||||
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
|
||||
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id())
|
||||
transfer_value = 100 * (10**6)
|
||||
(tx_hash_hex, o) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value)
|
||||
eth_rpc.do(o)
|
||||
@@ -41,3 +84,84 @@ def test_erc20_balance(
|
||||
r = t.get()
|
||||
assert r[0]['balance_network'] == transfer_value
|
||||
|
||||
|
||||
def test_erc20_transfer_task(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
agent_roles,
|
||||
custodial_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
token_object = {
|
||||
'address': foo_token,
|
||||
}
|
||||
transfer_value = 100 * (10 ** 6)
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_object],
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_transfer = celery.signature(
|
||||
'cic_eth.eth.erc20.transfer',
|
||||
[
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
agent_roles['ALICE'],
|
||||
transfer_value,
|
||||
default_chain_spec.asdict(),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_transfer)
|
||||
t = s_nonce.apply_async()
|
||||
r = t.get_leaf()
|
||||
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
|
||||
def test_erc20_approve_task(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
agent_roles,
|
||||
custodial_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
token_object = {
|
||||
'address': foo_token,
|
||||
}
|
||||
transfer_value = 100 * (10 ** 6)
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_object],
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_transfer = celery.signature(
|
||||
'cic_eth.eth.erc20.approve',
|
||||
[
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
agent_roles['ALICE'],
|
||||
transfer_value,
|
||||
default_chain_spec.asdict(),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_transfer)
|
||||
t = s_nonce.apply_async()
|
||||
r = t.get_leaf()
|
||||
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
Reference in New Issue
Block a user