cic-stack/apps/cic-eth/tests/task/test_task_account.py
2021-05-19 15:11:08 +00:00

190 lines
4.6 KiB
Python

# standard imports
import os
import logging
import time
# third-party imports
import pytest
import celery
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import receipt
from eth_accounts_index.registry import AccountRegistry
from hexathon import strip_0x
from chainqueue.db.enum import StatusEnum
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.error import OutOfGasError
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.role import AccountRole
logg = logging.getLogger()
def test_create_account(
default_chain_spec,
eth_rpc,
init_database,
celery_session_worker,
caplog,
):
s = celery.signature(
'cic_eth.eth.account.create',
[
'foo',
default_chain_spec.asdict(),
],
)
t = s.apply_async()
r = t.get()
session = SessionBase.create_session()
q = session.query(Nonce).filter(Nonce.address_hex==r)
o = q.first()
session.close()
assert o != None
assert o.nonce == 0
s = celery.signature(
'cic_eth.eth.account.have',
[
r,
default_chain_spec.asdict(),
],
)
t = s.apply_async()
assert r == t.get()
def test_register_account(
default_chain_spec,
account_registry,
init_database,
init_eth_tester,
eth_accounts,
eth_rpc,
cic_registry,
eth_empty_accounts,
custodial_roles,
call_sender,
celery_session_worker,
):
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
eth_empty_accounts[0],
default_chain_spec.asdict(),
custodial_roles['ACCOUNT_REGISTRY_WRITER'],
],
queue=None,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
default_chain_spec.asdict(),
custodial_roles['ACCOUNT_REGISTRY_WRITER'],
],
queue=None,
)
s_nonce.link(s_register)
t = s_nonce.apply_async()
address = t.get()
for r in t.collect():
logg.debug('r {}'.format(r))
assert t.successful()
session = SessionBase.create_session()
o = session.query(Otx).first()
tx_signed_hex = o.signed_tx
session.close()
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
[tx_signed_hex],
default_chain_spec.asdict(),
],
queue=None,
)
t = s_send.apply_async()
address = t.get()
r = t.collect()
t.successful()
init_eth_tester.mine_block()
c = AccountRegistry(default_chain_spec)
o = c.have(account_registry, eth_empty_accounts[0], sender_address=call_sender)
r = eth_rpc.do(o)
assert int(strip_0x(r), 16) == 1
def test_role_task(
default_chain_spec,
init_database,
celery_session_worker,
):
address = '0x' + os.urandom(20).hex()
role = AccountRole.set('foo', address)
init_database.add(role)
init_database.commit()
s = celery.signature(
'cic_eth.eth.account.role',
[
address,
default_chain_spec.asdict(),
],
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
def test_gift(
init_database,
default_chain_spec,
contract_roles,
agent_roles,
account_registry,
faucet,
eth_rpc,
eth_signer,
init_celery_tasks,
cic_registry,
celery_session_worker,
):
nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], eth_rpc)
c = AccountRegistry(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE'])
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
agent_roles['ALICE'],
default_chain_spec.asdict(),
],
queue=None,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
default_chain_spec.asdict(),
],
queue=None,
)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
r = t.get_leaf()
assert t.successful()