cic-stack/apps/cic-eth/tests/task/test_task_address.py

68 lines
2.0 KiB
Python
Raw Normal View History

2021-03-29 15:35:51 +02:00
# external imports
2021-07-15 00:02:59 +02:00
import celery
2021-03-29 15:35:51 +02:00
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import (
receipt,
)
from eth_address_declarator import Declarator
2021-03-29 15:35:51 +02:00
from hexathon import add_0x
# local imports
from cic_eth.ext.address import translate_tx_addresses
def test_translate(
default_chain_spec,
address_declarator,
eth_signer,
eth_rpc,
contract_roles,
agent_roles,
cic_registry,
init_celery_tasks,
2021-05-31 17:34:16 +02:00
register_lookups,
2021-07-15 00:02:59 +02:00
celery_session_worker,
2021-03-29 15:35:51 +02:00
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = Declarator(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
2021-03-29 15:35:51 +02:00
description = 'alice'.encode('utf-8').ljust(32, b'\x00').hex()
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['ALICE'], add_0x(description))
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
description = 'bob'.encode('utf-8').ljust(32, b'\x00').hex()
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['BOB'], add_0x(description))
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
tx = {
'sender': agent_roles['ALICE'],
'sender_label': None,
'recipient': agent_roles['BOB'],
'recipient_label': None,
}
2021-07-15 00:02:59 +02:00
#tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
s = celery.signature(
'cic_eth.ext.address.translate_tx_addresses',
[
tx,
[contract_roles['CONTRACT_DEPLOYER']],
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r['sender_label'] == 'alice'
assert r['recipient_label'] == 'bob'