From af97b7799f547980d1cea5138dd1a2772675ffea Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 6 Apr 2021 20:06:19 +0200 Subject: [PATCH] Enhance task, queue discovery --- apps/cic-notify/cic_notify/api.py | 58 +++++++++++++++++++++---------- apps/cic-notify/requirements.txt | 2 +- apps/cic-notify/setup.cfg | 3 +- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/apps/cic-notify/cic_notify/api.py b/apps/cic-notify/cic_notify/api.py index 789eb835..b80af71f 100644 --- a/apps/cic-notify/cic_notify/api.py +++ b/apps/cic-notify/cic_notify/api.py @@ -3,6 +3,7 @@ import logging import re # third-party imports +from celery.app.control import Inspect import celery # local imports @@ -15,6 +16,30 @@ logg = logging.getLogger() sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?" +re_q = r'^cic-notify' +def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): + host_queues = [] + + i = Inspect(app=app) + qs = i.active_queues() + for host in qs.keys(): + for q in qs[host]: + if re.match(re_q, q['name']): + host_queues.append((host, q['name'],)) + + task_prefix_len = len(task_prefix) + queue_tasks = [] + for (host, queue) in host_queues: + i = Inspect(app=app, destination=[host]) + for tasks in i.registered_tasks().values(): + for task in tasks: + logg.debug('q {}'.format(task)) + if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix: + queue_tasks.append((queue, task,)) + + return queue_tasks + + class Api: # TODO: Implement callback strategy def __init__(self, queue='cic-notify'): @@ -22,17 +47,9 @@ class Api: :param queue: The queue on which to execute notification tasks :type queue: str """ - registered_tasks = app.tasks - self.sms_tasks = [] + self.sms_tasks = get_sms_queue_tasks(app) + logg.debug('sms tasks {}'.format(self.sms_tasks)) - for task in registered_tasks.keys(): - logg.debug(f'Found: {task} {registered_tasks[task]}') - match = re.match(sms_tasks_matcher, task) - if match: - self.sms_tasks.append(task) - - self.queue = queue - logg.info(f'api using queue: {self.queue}') def sms(self, message, recipient): """This function chains all sms tasks in order to send a message, log and persist said data to disk @@ -44,12 +61,17 @@ class Api: :rtype: Celery.Task """ signatures = [] - for task in self.sms_tasks: - signature = celery.signature(task) + for q in self.sms_tasks: + signature = celery.signature( + q[1], + [ + message, + recipient, + ], + queue=q[0], + ) signatures.append(signature) - signature_group = celery.group(signatures) - result = signature_group.apply_async( - args=[message, recipient], - queue=self.queue - ) - return result + + t = celery.group(signatures)() + + return t diff --git a/apps/cic-notify/requirements.txt b/apps/cic-notify/requirements.txt index 7a952e58..0b7de273 100644 --- a/apps/cic-notify/requirements.txt +++ b/apps/cic-notify/requirements.txt @@ -1 +1 @@ -cic_base[full_graph]~=0.1.2a46 \ No newline at end of file +cic_base[full_graph]~=0.1.2a61 diff --git a/apps/cic-notify/setup.cfg b/apps/cic-notify/setup.cfg index 4ef87f2c..9daf04a9 100644 --- a/apps/cic-notify/setup.cfg +++ b/apps/cic-notify/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = cic-notify -version= 0.4.0a2 +version= 0.4.0a3 description = CIC notifications service author = Louis Holbrook author_email = dev@holbrook.no @@ -45,3 +45,4 @@ testing = [options.entry_points] console_scripts = cic-notify-tasker = cic_notify.runnable.tasker:main + cic-notify-send = cic_notify.runnable.send:main