Add account tx admin api test

This commit is contained in:
nolash 2021-05-28 12:35:43 +02:00
parent 0b5fb22b25
commit 6a60d97d70
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746

View File

@ -1,3 +1,8 @@
# standard imports
import logging
import io
import json
# external imports
import pytest
from chainlib.connection import RPCConnection
@ -5,6 +10,7 @@ from chainlib.eth.nonce import OverrideNonceOracle
from chainqueue.tx import create as queue_create
from chainlib.eth.tx import (
TxFormat,
Tx,
)
from chainlib.eth.block import block_latest
from chainlib.eth.gas import (
@ -24,6 +30,126 @@ from chainqueue.query import get_nonce_tx_cache
from cic_eth.api.api_admin import AdminApi
from cic_eth.eth.gas import cache_gas_data
logg = logging.getLogger()
def test_admin_api_account(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
contract_roles,
celery_session_worker,
caplog,
):
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42)
gas_oracle = OverrideGasOracle(limit=21000, conn=eth_rpc)
tx_hashes_alice = []
txs_alice = []
for i in range(3):
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
42+i,
agent_roles['ALICE'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
tx_hashes_alice.append(tx_hash_hex)
txs_alice.append(tx_signed_raw_hex)
init_database.commit()
nonce_oracle = OverrideNonceOracle(agent_roles['BOB'], 13)
tx_hashes_bob = []
txs_bob = []
for i in range(2):
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
13+i,
agent_roles['BOB'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
tx_hashes_bob.append(tx_hash_hex)
txs_bob.append(tx_signed_raw_hex)
init_database.commit()
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
r = api.account(default_chain_spec, agent_roles['ALICE'])
assert len(r) == 5
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
r = api.account(default_chain_spec, agent_roles['ALICE'], include_sender=False)
assert len(r) == 2
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
r = api.account(default_chain_spec, agent_roles['ALICE'], include_recipient=False)
assert len(r) == 3
def test_admin_api_account_writer(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
contract_roles,
celery_session_worker,
caplog,
):
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42)
gas_oracle = OverrideGasOracle(limit=21000, conn=eth_rpc)
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
42,
agent_roles['ALICE'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
init_database.commit()
buf = io.StringIO()
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
api.account(default_chain_spec, agent_roles['ALICE'], include_recipient=False, renderer=json.dumps, w=buf)
# TODO: improve eval
tx = json.loads(buf.getvalue())
assert tx['tx_hash'] == tx_hash_hex
def test_registry(
eth_rpc,
@ -92,7 +218,7 @@ def test_resend_inplace(
init_database.commit()
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['CONTRACT_DEPLOYER'])
t = api.resend(tx_hash_hex, default_chain_spec)
t = api.resend(tx_hash_hex, default_chain_spec, unlock=True)
r = t.get_leaf()
assert t.successful()