cic-internal-integration/apps/cic-eth/tests/task/api/test_admin.py

295 lines
7.6 KiB
Python

# standard imports
import os
import logging
# external imports
import celery
import pytest
from chainlib.eth.tx import (
unpack,
TxFormat,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import Gas
from chainlib.eth.address import to_checksum_address
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
LockEnum,
)
from cic_eth.error import InitializationError
from cic_eth.eth.tx import (
cache_gas_data,
)
#from cic_eth.eth.gas import cache_gas_tx
from cic_eth.queue.tx import (
create as queue_create,
get_tx,
)
logg = logging.getLogger()
#def test_resend_inplace(
# default_chain_spec,
# init_database,
# init_w3,
# celery_session_worker,
# ):
#
# chain_str = str(default_chain_spec)
# c = RpcClient(default_chain_spec)
#
# sigs = []
#
# gas_provider = c.gas_provider()
#
# s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce',
# [
# init_w3.eth.accounts[0],
# gas_provider,
# ],
# queue=None,
# )
# s_refill = celery.signature(
# 'cic_eth.eth.tx.refill_gas',
# [
# chain_str,
# ],
# queue=None,
# )
# s_nonce.link(s_refill)
# t = s_nonce.apply_async()
# t.get()
# for r in t.collect():
# pass
# assert t.successful()
#
# q = init_database.query(Otx)
# q = q.join(TxCache)
# q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
# o = q.first()
# tx_raw = o.signed_tx
#
# tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id())
# gas_price_before = tx_dict['gasPrice']
#
# s = celery.signature(
# 'cic_eth.admin.ctrl.lock_send',
# [
# chain_str,
# init_w3.eth.accounts[0],
# ],
# queue=None,
# )
# t = s.apply_async()
# t.get()
# assert t.successful()
#
# api = AdminApi(c, queue=None)
# t = api.resend(tx_dict['hash'], chain_str, unlock=True)
# t.get()
# i = 0
# tx_hash_new_hex = None
# for r in t.collect():
# tx_hash_new_hex = r[1]
# assert t.successful()
#
# tx_raw_new = get_tx(tx_hash_new_hex)
# logg.debug('get {}'.format(tx_raw_new))
# tx_dict_new = unpack_signed_raw_tx(bytes.fromhex(tx_raw_new['signed_tx'][2:]), default_chain_spec.chain_id())
# assert tx_hash_new_hex != tx_dict['hash']
# assert tx_dict_new['gasPrice'] > gas_price_before
#
# tx_dict_after = get_tx(tx_dict['hash'])
#
# logg.debug('logggg {}'.format(status_str(tx_dict_after['status'])))
# assert tx_dict_after['status'] & StatusBits.MANUAL
#def test_check_fix_nonce(
# default_chain_spec,
# init_database,
# init_eth_account_roles,
# init_w3,
# eth_empty_accounts,
# celery_session_worker,
# ):
#
# chain_str = str(default_chain_spec)
#
# sigs = []
# for i in range(5):
# s = celery.signature(
# 'cic_eth.eth.tx.refill_gas',
# [
# eth_empty_accounts[i],
# chain_str,
# ],
# queue=None,
# )
# sigs.append(s)
#
# t = celery.group(sigs)()
# txs = t.get()
# assert t.successful()
#
# tx_hash = web3.Web3.keccak(hexstr=txs[2])
# c = RpcClient(default_chain_spec)
# api = AdminApi(c, queue=None)
# address = init_eth_account_roles['eth_account_gas_provider']
# nonce_spec = api.check_nonce(address)
# assert nonce_spec['nonce']['network'] == 0
# assert nonce_spec['nonce']['queue'] == 4
# assert nonce_spec['nonce']['blocking'] == None
#
# s_set = celery.signature(
# 'cic_eth.queue.tx.set_rejected',
# [
# tx_hash.hex(),
# ],
# queue=None,
# )
# t = s_set.apply_async()
# t.get()
# t.collect()
# assert t.successful()
#
#
# nonce_spec = api.check_nonce(address)
# assert nonce_spec['nonce']['blocking'] == 2
# assert nonce_spec['tx']['blocking'] == tx_hash.hex()
#
# t = api.fix_nonce(address, nonce_spec['nonce']['blocking'])
# t.get()
# t.collect()
# assert t.successful()
#
# for tx in txs[3:]:
# tx_hash = web3.Web3.keccak(hexstr=tx)
# tx_dict = get_tx(tx_hash.hex())
# assert tx_dict['status'] == StatusEnum.OVERRIDDEN
#
#
def test_have_account(
default_chain_spec,
custodial_roles,
init_celery_tasks,
eth_rpc,
celery_session_worker,
):
api = AdminApi(None, queue=None)
t = api.have_account(custodial_roles['ALICE'], default_chain_spec)
assert t.get() != None
bogus_address = add_0x(to_checksum_address(os.urandom(20).hex()))
api = AdminApi(None, queue=None)
t = api.have_account(bogus_address, default_chain_spec)
assert t.get() == None
def test_locking(
default_chain_spec,
init_database,
agent_roles,
init_celery_tasks,
celery_session_worker,
):
api = AdminApi(None, queue=None)
t = api.lock(default_chain_spec, agent_roles['ALICE'], LockEnum.SEND)
t.get()
t = api.get_lock()
r = t.get()
assert len(r) == 1
t = api.unlock(default_chain_spec, agent_roles['ALICE'], LockEnum.SEND)
t.get()
t = api.get_lock()
r = t.get()
assert len(r) == 0
def test_tag_account(
default_chain_spec,
init_database,
agent_roles,
eth_rpc,
init_celery_tasks,
celery_session_worker,
):
api = AdminApi(eth_rpc, queue=None)
t = api.tag_account('foo', agent_roles['ALICE'], default_chain_spec)
t.get()
t = api.tag_account('bar', agent_roles['BOB'], default_chain_spec)
t.get()
t = api.tag_account('bar', agent_roles['CAROL'], default_chain_spec)
t.get()
assert AccountRole.get_address('foo', init_database) == agent_roles['ALICE']
assert AccountRole.get_address('bar', init_database) == agent_roles['CAROL']
#def test_ready(
# init_database,
# agent_roles,
# eth_rpc,
# ):
#
# api = AdminApi(eth_rpc)
#
# with pytest.raises(InitializationError):
# api.ready()
#
# bogus_account = os.urandom(20)
# bogus_account_hex = '0x' + bogus_account.hex()
#
# api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex))
# with pytest.raises(KeyError):
# api.ready()
#
# api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0])
# api.ready()
def test_tx(
default_chain_spec,
cic_registry,
init_database,
eth_rpc,
eth_signer,
agent_roles,
contract_roles,
celery_session_worker,
):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
queue_create(tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex, default_chain_spec, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['DEFAULT'])
tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex)
logg.warning('code missing to verify tx contents {}'.format(tx))