Discover queue for external tasks

This commit is contained in:
Louis Holbrook 2021-04-07 06:31:26 +00:00
parent 5c6375c9ec
commit 226699568f
4 changed files with 118 additions and 20 deletions

View File

@ -3,6 +3,7 @@ import logging
import re import re
# third-party imports # third-party imports
from celery.app.control import Inspect
import celery import celery
# local imports # local imports
@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?" sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api: class Api:
# TODO: Implement callback strategy # TODO: Implement callback strategy
def __init__(self, queue='cic-notify'): def __init__(self, queue='cic-notify'):
@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks :param queue: The queue on which to execute notification tasks
:type queue: str :type queue: str
""" """
registered_tasks = app.tasks self.sms_tasks = get_sms_queue_tasks(app)
self.sms_tasks = [] logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient): def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk """This function chains all sms tasks in order to send a message, log and persist said data to disk
@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task :rtype: Celery.Task
""" """
signatures = [] signatures = []
for task in self.sms_tasks: for q in self.sms_tasks:
signature = celery.signature(task) signature = celery.signature(
signatures.append(signature) q[1],
signature_group = celery.group(signatures) [
result = signature_group.apply_async( message,
args=[message, recipient], recipient,
queue=self.queue ],
queue=q[0],
) )
return result signatures.append(signature)
t = celery.group(signatures)()
return t

View File

@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46 cic_base[full_graph]~=0.1.2a61

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = cic-notify name = cic-notify
version= 0.4.0a2 version= 0.4.0a3
description = CIC notifications service description = CIC notifications service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@ -45,3 +45,4 @@ testing =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main