Add allowance and transfer from test
This commit is contained in:
parent
75c6609dff
commit
407b29cef5
@ -13,6 +13,7 @@ from chainlib.eth.tx import (
|
||||
|
||||
# local imports
|
||||
from cic_eth.queue.tx import register_tx
|
||||
from cic_eth.error import YouAreBrokeError
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@ -167,3 +168,101 @@ def test_erc20_approve_task(
|
||||
r = t.get_leaf()
|
||||
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
|
||||
def test_erc20_transfer_from_task(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
agent_roles,
|
||||
custodial_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
token_roles,
|
||||
):
|
||||
|
||||
token_object = {
|
||||
'address': foo_token,
|
||||
}
|
||||
transfer_value = 100 * (10 ** 6)
|
||||
|
||||
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], conn=eth_rpc)
|
||||
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
|
||||
(tx_hash, o) = c.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value)
|
||||
r = eth_rpc.do(o)
|
||||
o = receipt(tx_hash)
|
||||
r = eth_rpc.do(o)
|
||||
assert r['status'] == 1
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.nonce.reserve_nonce',
|
||||
[
|
||||
[token_object],
|
||||
default_chain_spec.asdict(),
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_transfer = celery.signature(
|
||||
'cic_eth.eth.erc20.transfer_from',
|
||||
[
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
agent_roles['BOB'],
|
||||
transfer_value,
|
||||
default_chain_spec.asdict(),
|
||||
agent_roles['ALICE'],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_transfer)
|
||||
t = s_nonce.apply_async()
|
||||
r = t.get_leaf()
|
||||
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
|
||||
def test_erc20_allowance_check_task(
|
||||
default_chain_spec,
|
||||
foo_token,
|
||||
agent_roles,
|
||||
custodial_roles,
|
||||
eth_signer,
|
||||
eth_rpc,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
token_roles,
|
||||
):
|
||||
|
||||
token_object = {
|
||||
'address': foo_token,
|
||||
'symbol': 'FOO',
|
||||
}
|
||||
transfer_value = 100 * (10 ** 6)
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.eth.erc20.check_allowance',
|
||||
[
|
||||
[token_object],
|
||||
custodial_roles['FOO_TOKEN_GIFTER'],
|
||||
transfer_value,
|
||||
default_chain_spec.asdict(),
|
||||
agent_roles['ALICE']
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
t = s_check.apply_async()
|
||||
with pytest.raises(YouAreBrokeError):
|
||||
t.get()
|
||||
|
||||
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], conn=eth_rpc)
|
||||
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
|
||||
(tx_hash, o) = c.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value)
|
||||
r = eth_rpc.do(o)
|
||||
o = receipt(tx_hash)
|
||||
r = eth_rpc.do(o)
|
||||
assert r['status'] == 1
|
||||
|
||||
t = s_check.apply_async()
|
||||
t.get()
|
||||
assert t.successful()
|
||||
|
Loading…
Reference in New Issue
Block a user