cic-internal-integration/apps/cic-notify/cic_notify/api.py

56 lines
1.6 KiB
Python
Raw Permalink Normal View History

2021-02-07 03:29:24 +01:00
# standard imports
import logging
import re
# third-party imports
import celery
# local imports
from cic_notify.tasks import sms
app = celery.current_app
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
"""
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
:param message: The message to be sent to the recipient.
:type message: str
:param recipient: The phone number of the recipient.
:type recipient: str
:return: a celery Task
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result