diff --git a/apps/cic-eth/tests/task/test_task_erc20.py b/apps/cic-eth/tests/task/test_task_erc20.py index d0024278..5574178c 100644 --- a/apps/cic-eth/tests/task/test_task_erc20.py +++ b/apps/cic-eth/tests/task/test_task_erc20.py @@ -13,6 +13,7 @@ from chainlib.eth.tx import ( # local imports from cic_eth.queue.tx import register_tx +from cic_eth.error import YouAreBrokeError logg = logging.getLogger() @@ -167,3 +168,101 @@ def test_erc20_approve_task( r = t.get_leaf() logg.debug('result {}'.format(r)) + + +def test_erc20_transfer_from_task( + default_chain_spec, + foo_token, + agent_roles, + custodial_roles, + eth_signer, + eth_rpc, + init_database, + celery_session_worker, + token_roles, + ): + + token_object = { + 'address': foo_token, + } + transfer_value = 100 * (10 ** 6) + + nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], conn=eth_rpc) + c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle) + (tx_hash, o) = c.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value) + r = eth_rpc.do(o) + o = receipt(tx_hash) + r = eth_rpc.do(o) + assert r['status'] == 1 + + s_nonce = celery.signature( + 'cic_eth.eth.nonce.reserve_nonce', + [ + [token_object], + default_chain_spec.asdict(), + custodial_roles['FOO_TOKEN_GIFTER'], + ], + queue=None, + ) + s_transfer = celery.signature( + 'cic_eth.eth.erc20.transfer_from', + [ + custodial_roles['FOO_TOKEN_GIFTER'], + agent_roles['BOB'], + transfer_value, + default_chain_spec.asdict(), + agent_roles['ALICE'], + ], + queue=None, + ) + s_nonce.link(s_transfer) + t = s_nonce.apply_async() + r = t.get_leaf() + + logg.debug('result {}'.format(r)) + + +def test_erc20_allowance_check_task( + default_chain_spec, + foo_token, + agent_roles, + custodial_roles, + eth_signer, + eth_rpc, + init_database, + celery_session_worker, + token_roles, + ): + + token_object = { + 'address': foo_token, + 'symbol': 'FOO', + } + transfer_value = 100 * (10 ** 6) + + s_check = celery.signature( + 'cic_eth.eth.erc20.check_allowance', + [ + [token_object], + custodial_roles['FOO_TOKEN_GIFTER'], + transfer_value, + default_chain_spec.asdict(), + agent_roles['ALICE'] + ], + queue=None, + ) + t = s_check.apply_async() + with pytest.raises(YouAreBrokeError): + t.get() + + nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], conn=eth_rpc) + c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle) + (tx_hash, o) = c.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value) + r = eth_rpc.do(o) + o = receipt(tx_hash) + r = eth_rpc.do(o) + assert r['status'] == 1 + + t = s_check.apply_async() + t.get() + assert t.successful()