cic-stack/apps/cic-eth/cic_eth/pytest/fixtures_celery.py

78 lines
2.0 KiB
Python
Raw Permalink Normal View History

# external imports
2021-02-01 18:12:51 +01:00
import pytest
import tempfile
import logging
import shutil
2021-10-14 15:24:51 +02:00
# local imports
from cic_eth.task import BaseTask
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@pytest.fixture(scope='function')
def init_celery_tasks(
contract_roles,
):
BaseTask.call_address = contract_roles['DEFAULT']
2021-10-14 15:24:51 +02:00
BaseTask.trusted_addresses = [
contract_roles['TRUSTED_DECLARATOR'],
contract_roles['CONTRACT_DEPLOYER'],
]
2021-02-01 18:12:51 +01:00
# celery fixtures
@pytest.fixture(scope='session')
def celery_includes():
return [
'cic_eth.eth.erc20',
2021-02-01 18:12:51 +01:00
'cic_eth.eth.tx',
'cic_eth.ext.tx',
2021-02-01 18:12:51 +01:00
'cic_eth.queue.tx',
2021-04-04 14:40:59 +02:00
'cic_eth.queue.lock',
'cic_eth.queue.query',
'cic_eth.queue.state',
2021-02-17 11:04:21 +01:00
'cic_eth.queue.balance',
2021-02-01 18:12:51 +01:00
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.admin.debug',
'cic_eth.admin.token',
2021-02-01 18:12:51 +01:00
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',
2021-07-08 17:28:04 +02:00
'cic_eth.pytest.mock.filter',
2021-10-14 15:24:51 +02:00
'cic_eth.pytest.mock.callback',
2021-02-01 18:12:51 +01:00
]
@pytest.fixture(scope='session')
def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
2021-05-31 17:34:16 +02:00
logg.debug('celery broker session queue {} processed {}'.format(bq, bp))
logg.debug('celery backend session store {}'.format(rq))
2021-02-01 18:12:51 +01:00
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_backend': 'file://{}'.format(rq),
}
2021-05-31 17:34:16 +02:00
logg.debug('cleaning up celery session filesystem backend files {} {} {}'.format(bq, bp, rq))
2021-02-01 18:12:51 +01:00
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('celery'),
2021-02-01 18:12:51 +01:00
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True