From eb2f71aee0f48ea89db39f4c232388c20df7e7e8 Mon Sep 17 00:00:00 2001 From: PhilipWafula Date: Mon, 14 Jun 2021 10:38:23 +0300 Subject: [PATCH] Enable api level setting of queue value. --- apps/cic-notify/cic_notify/api.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/cic-notify/cic_notify/api.py b/apps/cic-notify/cic_notify/api.py index f879a6e..fb27f7b 100644 --- a/apps/cic-notify/cic_notify/api.py +++ b/apps/cic-notify/cic_notify/api.py @@ -26,7 +26,7 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): for q in qs[host]: if re.match(re_q, q['name']): host_queues.append((host, q['name'],)) - + task_prefix_len = len(task_prefix) queue_tasks = [] for (host, queue) in host_queues: @@ -35,17 +35,18 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): for task in tasks: if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix: queue_tasks.append((queue, task,)) - + return queue_tasks class Api: # TODO: Implement callback strategy - def __init__(self, queue='cic-notify'): + def __init__(self, queue=None): """ :param queue: The queue on which to execute notification tasks :type queue: str """ + self.queue = queue self.sms_tasks = get_sms_queue_tasks(app) logg.debug('sms tasks {}'.format(self.sms_tasks)) @@ -61,13 +62,19 @@ class Api: """ signatures = [] for q in self.sms_tasks: + + if not self.queue: + queue = q[0] + else: + queue = self.queue + signature = celery.signature( q[1], [ message, recipient, ], - queue=q[0], + queue=queue, ) signatures.append(signature)