Move pytest fixtures to importable location, add allowance check for transfer from

This commit is contained in:
nolash
2021-06-29 08:21:11 +02:00
parent 18382a1f35
commit 6a45d5faa7
9 changed files with 246 additions and 9 deletions

View File

@@ -17,11 +17,11 @@ root_dir = os.path.dirname(script_dir)
sys.path.insert(0, root_dir)
# assemble fixtures
from tests.fixtures_config import *
from tests.fixtures_database import *
from tests.fixtures_celery import *
from tests.fixtures_role import *
from tests.fixtures_contract import *
from cic_eth.pytest.fixtures_config import *
from cic_eth.pytest.fixtures_celery import *
from cic_eth.pytest.fixtures_database import *
from cic_eth.pytest.fixtures_role import *
from cic_eth.pytest.fixtures_contract import *
from chainlib.eth.pytest import *
from eth_contract_registry.pytest import *
from cic_eth_registry.pytest.fixtures_contracts import *

View File

@@ -1,73 +0,0 @@
# external imports
import pytest
import tempfile
import logging
import shutil
# local impors
from cic_eth.task import BaseTask
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@pytest.fixture(scope='function')
def init_celery_tasks(
contract_roles,
):
BaseTask.call_address = contract_roles['DEFAULT']
# celery fixtures
@pytest.fixture(scope='session')
def celery_includes():
return [
'cic_eth.eth.erc20',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
'cic_eth.queue.tx',
'cic_eth.queue.lock',
'cic_eth.queue.query',
'cic_eth.queue.state',
'cic_eth.queue.balance',
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.admin.debug',
'cic_eth.admin.token',
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',
'tests.mock.filter',
]
@pytest.fixture(scope='session')
def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
logg.debug('celery broker session queue {} processed {}'.format(bq, bp))
logg.debug('celery backend session store {}'.format(rq))
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_backend': 'file://{}'.format(rq),
}
logg.debug('cleaning up celery session filesystem backend files {} {} {}'.format(bq, bp, rq))
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('celery'),
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True

View File

@@ -1,27 +0,0 @@
# standard imports
import os
import logging
# third-party imports
import pytest
import confini
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, 'config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))
return conf
@pytest.fixture(scope='session')
def config(
load_config
):
return load_config

View File

@@ -1,77 +0,0 @@
# standard imports
import os
# external imports
import pytest
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
TxFactory,
TxFormat,
unpack,
Tx,
)
from hexathon import strip_0x
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
@pytest.fixture(scope='function')
def bogus_tx_block(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = OverrideGasOracle(limit=2000000, conn=eth_rpc)
f = open(os.path.join(script_dir, 'testdata', 'Bogus.bin'), 'r')
bytecode = f.read()
f.close()
c = TxFactory(default_chain_spec, signer=eth_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
tx = c.template(contract_roles['CONTRACT_DEPLOYER'], None, use_nonce=True)
tx = c.set_code(tx, bytecode)
(tx_hash_hex, o) = c.build(tx)
r = eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
contract_address = r['contract_address']
enc = ABIContractEncoder()
enc.method('poke')
data = enc.get()
tx = c.template(contract_roles['CONTRACT_DEPLOYER'], contract_address, use_nonce=True)
tx = c.set_code(tx, data)
(tx_hash_hex, o) = c.finalize(tx, TxFormat.JSONRPC)
r = eth_rpc.do(o)
tx_signed_raw_hex = strip_0x(o['params'][0])
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block)
return (block, tx)

View File

@@ -1,61 +0,0 @@
# standard imports
import os
import logging
# external imports
import pytest
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_eth.db import SessionBase
from cic_eth.db import dsn_from_config
logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def database_engine(
load_config,
):
if load_config.get('DATABASE_ENGINE') == 'sqlite':
try:
os.unlink(load_config.get('DATABASE_NAME'))
except FileNotFoundError:
pass
SessionBase.transactional = False
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn
@pytest.fixture(scope='function')
def init_database(
load_config,
database_engine,
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_eth', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.execute('DELETE FROM lock')
session.commit()
yield session
session.commit()
session.close()

View File

@@ -1,62 +0,0 @@
# standard imports
import logging
# external imports
import pytest
from hexathon import add_0x
from chainlib.eth.address import to_checksum_address
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__)
# what the actual fuck, debug is not being shown even though explicitly set
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@pytest.fixture(scope='function')
def init_custodial(
contract_roles,
token_roles,
agent_roles,
init_database,
):
for roles in [contract_roles, token_roles, agent_roles]:
for role in roles.values():
Nonce.init(role, session=init_database)
init_database.commit()
@pytest.fixture(scope='function')
def custodial_roles(
init_custodial,
contract_roles,
token_roles,
agent_roles,
eth_accounts,
eth_keystore,
init_database,
):
r = {}
r.update(contract_roles)
r.update(agent_roles)
r.update({
'GAS_GIFTER': eth_accounts[10],
'FOO_TOKEN_GIFTER': token_roles['FOO_TOKEN_OWNER'],
})
for k in r.keys():
role = AccountRole.set(k, r[k])
init_database.add(role)
logg.info('adding role {} -> {}'.format(k, r[k]))
init_database.commit()
return r
@pytest.fixture(scope='function')
def whoever(
init_eth_tester,
):
return init_eth_tester.new_account()