Enhance task, queue discovery

This commit is contained in:
nolash 2021-04-06 20:06:19 +02:00
parent 7728f38f14
commit af97b7799f
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 43 additions and 20 deletions

View File

@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@ -15,6 +16,30 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
logg.debug('q {}'.format(task))
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@ -22,17 +47,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@ -44,12 +61,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a2
version= 0.4.0a3
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@ -45,3 +45,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main