From f67702cc79ed9b9831a16afaf98fb5881d09f1ef Mon Sep 17 00:00:00 2001 From: philip Date: Tue, 4 Jan 2022 18:38:48 +0300 Subject: [PATCH] Simplifies api module. --- apps/cic-notify/cic_notify/api.py | 61 +++----------------- apps/cic-notify/tests/cic_notify/test_api.py | 24 ++++++++ 2 files changed, 33 insertions(+), 52 deletions(-) diff --git a/apps/cic-notify/cic_notify/api.py b/apps/cic-notify/cic_notify/api.py index fb27f7bd..a8cfc573 100644 --- a/apps/cic-notify/cic_notify/api.py +++ b/apps/cic-notify/cic_notify/api.py @@ -3,6 +3,7 @@ import logging import re # third-party imports +import cic_notify.tasks.sms.db from celery.app.control import Inspect import celery @@ -13,45 +14,16 @@ app = celery.current_app logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() -sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?" - - -re_q = r'^cic-notify' -def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'): - host_queues = [] - - i = Inspect(app=app) - qs = i.active_queues() - for host in qs.keys(): - for q in qs[host]: - if re.match(re_q, q['name']): - host_queues.append((host, q['name'],)) - - task_prefix_len = len(task_prefix) - queue_tasks = [] - for (host, queue) in host_queues: - i = Inspect(app=app, destination=[host]) - for tasks in i.registered_tasks().values(): - for task in tasks: - if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix: - queue_tasks.append((queue, task,)) - - return queue_tasks - class Api: - # TODO: Implement callback strategy - def __init__(self, queue=None): + def __init__(self, queue: any = 'cic-notify'): """ :param queue: The queue on which to execute notification tasks :type queue: str """ self.queue = queue - self.sms_tasks = get_sms_queue_tasks(app) - logg.debug('sms tasks {}'.format(self.sms_tasks)) - - def sms(self, message, recipient): + def sms(self, message: str, recipient: str): """This function chains all sms tasks in order to send a message, log and persist said data to disk :param message: The message to be sent to the recipient. :type message: str @@ -60,24 +32,9 @@ class Api: :return: a celery Task :rtype: Celery.Task """ - signatures = [] - for q in self.sms_tasks: - - if not self.queue: - queue = q[0] - else: - queue = self.queue - - signature = celery.signature( - q[1], - [ - message, - recipient, - ], - queue=queue, - ) - signatures.append(signature) - - t = celery.group(signatures)() - - return t + s_send = celery.signature('cic_notify.tasks.sms.africastalking.send', [message, recipient], queue=self.queue) + s_log = celery.signature('cic_notify.tasks.sms.log.log', [message, recipient], queue=self.queue) + s_persist_notification = celery.signature( + 'cic_notify.tasks.sms.db.persist_notification', [message, recipient], queue=self.queue) + signatures = [s_send, s_log, s_persist_notification] + return celery.group(signatures)() diff --git a/apps/cic-notify/tests/cic_notify/test_api.py b/apps/cic-notify/tests/cic_notify/test_api.py index e69de29b..b8e7dd4f 100644 --- a/apps/cic-notify/tests/cic_notify/test_api.py +++ b/apps/cic-notify/tests/cic_notify/test_api.py @@ -0,0 +1,24 @@ +# standard imports + +# external imports +import celery + +# local imports +from cic_notify.api import Api + +# test imports +from tests.helpers.phone import phone_number + + +def test_api(celery_session_worker, mocker): + mocked_group = mocker.patch('celery.group') + message = 'Hello world.' + recipient = phone_number() + s_send = celery.signature('cic_notify.tasks.sms.africastalking.send', [message, recipient], queue=None) + s_log = celery.signature('cic_notify.tasks.sms.log.log', [message, recipient], queue=None) + s_persist_notification = celery.signature( + 'cic_notify.tasks.sms.db.persist_notification', [message, recipient], queue=None) + signatures = [s_send, s_log, s_persist_notification] + api = Api(queue=None) + api.sms(message, recipient) + mocked_group.assert_called_with(signatures)