cic-stack/apps/cic-eth/tests/fixtures_celery.py

61 lines
1.5 KiB
Python
Raw Normal View History

2021-02-01 18:12:51 +01:00
# third-party imports
import pytest
import tempfile
import logging
import shutil
logg = logging.getLogger(__name__)
# celery fixtures
@pytest.fixture(scope='session')
def celery_includes():
return [
'cic_eth.eth.bancor',
'cic_eth.eth.token',
'cic_eth.eth.request',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
2021-02-01 18:12:51 +01:00
'cic_eth.queue.tx',
2021-02-17 11:04:21 +01:00
'cic_eth.queue.balance',
2021-02-01 18:12:51 +01:00
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',
'tests.mock.filter',
2021-02-01 18:12:51 +01:00
]
@pytest.fixture(scope='session')
def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
logg.debug('celery backend store {}'.format(rq))
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_backend': 'file://{}'.format(rq),
}
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('cic-eth'),
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True