From 226699568f7066e43a2cbbc931f43840dd550d9d Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Wed, 7 Apr 2021 06:31:26 +0000 Subject: [PATCH] Discover queue for external tasks --- apps/cic-notify/cic_notify/api.py | 57 +++++++++++----- apps/cic-notify/cic_notify/runnable/send.py | 76 +++++++++++++++++++++ apps/cic-notify/requirements.txt | 2 +- apps/cic-notify/setup.cfg | 3 +- 4 files changed, 118 insertions(+), 20 deletions(-) create mode 100644 apps/cic-notify/cic_notify/runnable/send.py diff --git a/apps/cic-notify/cic_notify/api.py b/apps/cic-notify/cic_notify/api.py index 789eb835..f879a6ee 100644 --- a/apps/cic-notify/cic_notify/api.py +++ b/apps/cic-notify/cic_notify/api.py @@ -3,6 +3,7 @@ import logging import re # third-party imports +from celery.app.control import Inspect import celery # local imports @@ -15,6 +16,29 @@ logg = logging.getLogger() sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?" +re_q = r'^cic-notify' +def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): + host_queues = [] + + i = Inspect(app=app) + qs = i.active_queues() + for host in qs.keys(): + for q in qs[host]: + if re.match(re_q, q['name']): + host_queues.append((host, q['name'],)) + + task_prefix_len = len(task_prefix) + queue_tasks = [] + for (host, queue) in host_queues: + i = Inspect(app=app, destination=[host]) + for tasks in i.registered_tasks().values(): + for task in tasks: + if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix: + queue_tasks.append((queue, task,)) + + return queue_tasks + + class Api: # TODO: Implement callback strategy def __init__(self, queue='cic-notify'): @@ -22,17 +46,9 @@ class Api: :param queue: The queue on which to execute notification tasks :type queue: str """ - registered_tasks = app.tasks - self.sms_tasks = [] + self.sms_tasks = get_sms_queue_tasks(app) + logg.debug('sms tasks {}'.format(self.sms_tasks)) - for task in registered_tasks.keys(): - logg.debug(f'Found: {task} {registered_tasks[task]}') - match = re.match(sms_tasks_matcher, task) - if match: - self.sms_tasks.append(task) - - self.queue = queue - logg.info(f'api using queue: {self.queue}') def sms(self, message, recipient): """This function chains all sms tasks in order to send a message, log and persist said data to disk @@ -44,12 +60,17 @@ class Api: :rtype: Celery.Task """ signatures = [] - for task in self.sms_tasks: - signature = celery.signature(task) + for q in self.sms_tasks: + signature = celery.signature( + q[1], + [ + message, + recipient, + ], + queue=q[0], + ) signatures.append(signature) - signature_group = celery.group(signatures) - result = signature_group.apply_async( - args=[message, recipient], - queue=self.queue - ) - return result + + t = celery.group(signatures)() + + return t diff --git a/apps/cic-notify/cic_notify/runnable/send.py b/apps/cic-notify/cic_notify/runnable/send.py new file mode 100644 index 00000000..5126736e --- /dev/null +++ b/apps/cic-notify/cic_notify/runnable/send.py @@ -0,0 +1,76 @@ +# standard imports +import sys +import os +import logging +import argparse +import tempfile + +# external imports +import celery +import confini + +# local imports +from cic_notify.api import Api + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +config_dir = os.path.join('/usr/local/etc/cic-notify') + +argparser = argparse.ArgumentParser() +argparser.add_argument('-c', type=str, default=config_dir, help='config file') +argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('-v', action='store_true', help='be verbose') +argparser.add_argument('-vv', action='store_true', help='be more verbose') +argparser.add_argument('recipient', type=str, help='notification recipient') +argparser.add_argument('message', type=str, help='message text') +args = argparser.parse_args() + +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: + logging.getLogger().setLevel(logging.INFO) + +config = confini.Config(args.c, args.env_prefix) +config.process() +config.censor('PASSWORD', 'DATABASE') +config.add(args.recipient, '_RECIPIENT', True) +config.add(args.message, '_MESSAGE', True) + +# set up celery +app = celery.Celery(__name__) + +broker = config.get('CELERY_BROKER_URL') +if broker[:4] == 'file': + bq = tempfile.mkdtemp() + bp = tempfile.mkdtemp() + app.conf.update({ + 'broker_url': broker, + 'broker_transport_options': { + 'data_folder_in': bq, + 'data_folder_out': bq, + 'data_folder_processed': bp, + }, + }, + ) + logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp)) +else: + app.conf.update({ + 'broker_url': broker, + }) + +result = config.get('CELERY_RESULT_URL') +if result[:4] == 'file': + rq = tempfile.mkdtemp() + app.conf.update({ + 'result_backend': 'file://{}'.format(rq), + }) + logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq)) +else: + app.conf.update({ + 'result_backend': result, + }) + +if __name__ == '__main__': + a = Api() + t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE')) diff --git a/apps/cic-notify/requirements.txt b/apps/cic-notify/requirements.txt index 7a952e58..0b7de273 100644 --- a/apps/cic-notify/requirements.txt +++ b/apps/cic-notify/requirements.txt @@ -1 +1 @@ -cic_base[full_graph]~=0.1.2a46 \ No newline at end of file +cic_base[full_graph]~=0.1.2a61 diff --git a/apps/cic-notify/setup.cfg b/apps/cic-notify/setup.cfg index 4ef87f2c..9daf04a9 100644 --- a/apps/cic-notify/setup.cfg +++ b/apps/cic-notify/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = cic-notify -version= 0.4.0a2 +version= 0.4.0a3 description = CIC notifications service author = Louis Holbrook author_email = dev@holbrook.no @@ -45,3 +45,4 @@ testing = [options.entry_points] console_scripts = cic-notify-tasker = cic_notify.runnable.tasker:main + cic-notify-send = cic_notify.runnable.send:main