Enable api level setting of queue value.

This commit is contained in:
PhilipWafula 2021-06-14 10:38:23 +03:00
parent e5b1352970
commit eb2f71aee0
Signed by: mango-habanero
GPG Key ID: B00CE9034DA19FB7

View File

@ -26,7 +26,7 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
@ -35,17 +35,18 @@ def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
def __init__(self, queue=None):
"""
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
self.queue = queue
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
@ -61,13 +62,19 @@ class Api:
"""
signatures = []
for q in self.sms_tasks:
if not self.queue:
queue = q[0]
else:
queue = self.queue
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
queue=queue,
)
signatures.append(signature)