cic-internal-integration/apps/cic-notify/cic_notify/runnable/tasker.py

123 lines
3.6 KiB
Python

# standard imports
import os
import logging
import importlib
import argparse
import tempfile
# third-party imports
import celery
import confini
# local imports
from cic_notify.db.models.base import SessionBase
from cic_notify.db import dsn_from_config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-notify', help='queue name for worker tasks')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.add(args.q, '_CELERY_QUEUE', True)
config.censor('API_KEY', 'AFRICASTALKING')
config.censor('API_USERNAME', 'AFRICASTALKING')
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
# verify database connection with minimal sanity query
session = SessionBase.create_session()
session.execute('select version_num from alembic_version')
session.close()
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
for key in config.store.keys():
if key[:5] == 'TASKS':
logg.info(f'adding sms task from {key}')
module = importlib.import_module(config.store[key])
if key == 'TASKS_AFRICASTALKING':
africastalking_notifier = module.AfricasTalkingNotifier
api_sender_id = config.get('AFRICASTALKING_API_SENDER_ID')
logg.debug(f'SENDER ID VALUE IS: {api_sender_id}')
if not api_sender_id:
api_sender_id = None
logg.debug(f'SENDER ID RESOLVED TO NONE: {api_sender_id}')
africastalking_notifier.initialize(
config.get('AFRICASTALKING_API_USERNAME'),
config.get('AFRICASTALKING_API_KEY'),
api_sender_id
)
def main():
argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q')
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
app.worker_main(argv)
if __name__ == '__main__':
main()