diff --git a/apps/cic-eth/tests/unit/ext/test_address.py b/apps/cic-eth/tests/unit/ext/test_address.py new file mode 100644 index 00000000..ecfde959 --- /dev/null +++ b/apps/cic-eth/tests/unit/ext/test_address.py @@ -0,0 +1,50 @@ +# external imports +from chainlib.eth.nonce import RPCNonceOracle +from chainlib.eth.tx import ( + receipt, + ) +from eth_address_declarator import AddressDeclarator +from hexathon import add_0x + +# local imports +from cic_eth.ext.address import translate_tx_addresses + + +def test_translate( + default_chain_spec, + address_declarator, + eth_signer, + eth_rpc, + contract_roles, + agent_roles, + cic_registry, + init_celery_tasks, + ): + + nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc) + + c = AddressDeclarator(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id()) + + description = 'alice'.encode('utf-8').ljust(32, b'\x00').hex() + (tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['ALICE'], add_0x(description)) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + assert r['status'] == 1 + + description = 'bob'.encode('utf-8').ljust(32, b'\x00').hex() + (tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['BOB'], add_0x(description)) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + assert r['status'] == 1 + + tx = { + 'sender': agent_roles['ALICE'], + 'sender_label': None, + 'recipient': agent_roles['BOB'], + 'recipient_label': None, + } + tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict()) + assert tx['sender_label'] == 'alice' + assert tx['recipient_label'] == 'bob' diff --git a/apps/cic-eth/tests/unit/ext/test_ext_tx.py b/apps/cic-eth/tests/unit/ext/test_ext_tx.py new file mode 100644 index 00000000..d942c4ac --- /dev/null +++ b/apps/cic-eth/tests/unit/ext/test_ext_tx.py @@ -0,0 +1,96 @@ +# standard imports +import logging + +# external imports +import celery +import moolb +from chainlib.eth.tx import ( + count, + receipt, + ) +from chainlib.eth.erc20 import ERC20 +from chainlib.eth.nonce import RPCNonceOracle + +# local imports +from cic_eth.db.models.nonce import ( + NonceReservation, + Nonce, + ) + +logg = logging.getLogger() + + +# TODO: This test fails when not run alone. Identify which fixture leaves a dirty state +def test_filter_process( + init_database, + default_chain_spec, + init_eth_tester, + eth_rpc, + eth_signer, + agent_roles, + init_custodial, + cic_registry, + foo_token, + celery_session_worker, + ): + + b = moolb.Bloom(1024, 3) + t = moolb.Bloom(1024, 3) + + tx_hashes = [] + + # external tx + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + + init_eth_tester.mine_blocks(13) + c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 1024) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + a = r['block_number'] + b.add(a.to_bytes(4, 'big')) + a = r['block_number'] + r['transaction_index'] + t.add(a.to_bytes(4, 'big')) + tx_hashes.append(tx_hash_hex) + + # external tx + init_eth_tester.mine_blocks(28) + c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 512) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + a = r['block_number'] + b.add(a.to_bytes(4, 'big')) + a = r['block_number'] + r['transaction_index'] + t.add(a.to_bytes(4, 'big')) + tx_hashes.append(tx_hash_hex) + + init_eth_tester.mine_blocks(10) + + o = { + 'alg': 'sha256', + 'filter_rounds': 3, + 'low': 0, + 'high': 50, + 'block_filter': b.to_bytes().hex(), + 'blocktx_filter': t.to_bytes().hex(), + } + + s = celery.signature( + 'cic_eth.ext.tx.list_tx_by_bloom', + [ + o, + agent_roles['BOB'], + default_chain_spec.asdict(), + ], + queue=None + ) + t = s.apply_async() + r = t.get() + + assert len(r) == 2 + for tx_hash in r.keys(): + tx_hashes.remove(tx_hash) + assert len(tx_hashes) == 0