Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks See merge request grassrootseconomics/cic-internal-integration!89
This commit is contained in:
		
						commit
						5a4e0b8eba
					
				@ -3,6 +3,7 @@ import logging
 | 
			
		||||
import re
 | 
			
		||||
 | 
			
		||||
# third-party imports
 | 
			
		||||
from celery.app.control import Inspect
 | 
			
		||||
import celery
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
@ -15,6 +16,29 @@ logg = logging.getLogger()
 | 
			
		||||
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
re_q = r'^cic-notify'
 | 
			
		||||
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
 | 
			
		||||
    host_queues = []
 | 
			
		||||
 | 
			
		||||
    i = Inspect(app=app)
 | 
			
		||||
    qs = i.active_queues()
 | 
			
		||||
    for host in qs.keys():
 | 
			
		||||
        for q in qs[host]:
 | 
			
		||||
            if re.match(re_q, q['name']):
 | 
			
		||||
                host_queues.append((host, q['name'],))
 | 
			
		||||
   
 | 
			
		||||
    task_prefix_len = len(task_prefix)
 | 
			
		||||
    queue_tasks = []
 | 
			
		||||
    for (host, queue) in host_queues:
 | 
			
		||||
        i = Inspect(app=app, destination=[host])
 | 
			
		||||
        for tasks in i.registered_tasks().values():
 | 
			
		||||
            for task in tasks:
 | 
			
		||||
                if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
 | 
			
		||||
                    queue_tasks.append((queue, task,))
 | 
			
		||||
    
 | 
			
		||||
    return queue_tasks
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Api:
 | 
			
		||||
    # TODO: Implement callback strategy
 | 
			
		||||
    def __init__(self, queue='cic-notify'):
 | 
			
		||||
@ -22,17 +46,9 @@ class Api:
 | 
			
		||||
        :param queue: The queue on which to execute notification tasks
 | 
			
		||||
        :type queue: str
 | 
			
		||||
        """
 | 
			
		||||
        registered_tasks = app.tasks
 | 
			
		||||
        self.sms_tasks = []
 | 
			
		||||
        self.sms_tasks = get_sms_queue_tasks(app)
 | 
			
		||||
        logg.debug('sms tasks {}'.format(self.sms_tasks))
 | 
			
		||||
 | 
			
		||||
        for task in registered_tasks.keys():
 | 
			
		||||
            logg.debug(f'Found: {task} {registered_tasks[task]}')
 | 
			
		||||
            match = re.match(sms_tasks_matcher, task)
 | 
			
		||||
            if match:
 | 
			
		||||
                self.sms_tasks.append(task)
 | 
			
		||||
 | 
			
		||||
        self.queue = queue
 | 
			
		||||
        logg.info(f'api using queue: {self.queue}')
 | 
			
		||||
 | 
			
		||||
    def sms(self, message, recipient):
 | 
			
		||||
        """This function chains all sms tasks in order to send a message, log and persist said data to disk
 | 
			
		||||
@ -44,12 +60,17 @@ class Api:
 | 
			
		||||
        :rtype: Celery.Task
 | 
			
		||||
        """
 | 
			
		||||
        signatures = []
 | 
			
		||||
        for task in self.sms_tasks:
 | 
			
		||||
            signature = celery.signature(task)
 | 
			
		||||
        for q in self.sms_tasks:
 | 
			
		||||
            signature = celery.signature(
 | 
			
		||||
                    q[1],
 | 
			
		||||
                    [
 | 
			
		||||
                        message,
 | 
			
		||||
                        recipient,
 | 
			
		||||
                        ],
 | 
			
		||||
                    queue=q[0],
 | 
			
		||||
                    )
 | 
			
		||||
            signatures.append(signature)
 | 
			
		||||
            signature_group = celery.group(signatures)
 | 
			
		||||
            result = signature_group.apply_async(
 | 
			
		||||
                args=[message, recipient],
 | 
			
		||||
                queue=self.queue
 | 
			
		||||
            )
 | 
			
		||||
            return result
 | 
			
		||||
 | 
			
		||||
        t = celery.group(signatures)()
 | 
			
		||||
 | 
			
		||||
        return t
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										76
									
								
								apps/cic-notify/cic_notify/runnable/send.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								apps/cic-notify/cic_notify/runnable/send.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,76 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import sys
 | 
			
		||||
import os
 | 
			
		||||
import logging
 | 
			
		||||
import argparse
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
# external imports
 | 
			
		||||
import celery
 | 
			
		||||
import confini
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from cic_notify.api import Api
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(level=logging.WARNING)
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
config_dir = os.path.join('/usr/local/etc/cic-notify')
 | 
			
		||||
 | 
			
		||||
argparser = argparse.ArgumentParser()
 | 
			
		||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
 | 
			
		||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
 | 
			
		||||
argparser.add_argument('-v', action='store_true', help='be verbose')
 | 
			
		||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
 | 
			
		||||
argparser.add_argument('recipient', type=str, help='notification recipient')
 | 
			
		||||
argparser.add_argument('message', type=str, help='message text')
 | 
			
		||||
args = argparser.parse_args()
 | 
			
		||||
 | 
			
		||||
if args.vv:
 | 
			
		||||
    logging.getLogger().setLevel(logging.DEBUG)
 | 
			
		||||
elif args.v:
 | 
			
		||||
    logging.getLogger().setLevel(logging.INFO)
 | 
			
		||||
 | 
			
		||||
config = confini.Config(args.c, args.env_prefix)
 | 
			
		||||
config.process()
 | 
			
		||||
config.censor('PASSWORD', 'DATABASE')
 | 
			
		||||
config.add(args.recipient, '_RECIPIENT', True)
 | 
			
		||||
config.add(args.message, '_MESSAGE', True)
 | 
			
		||||
 | 
			
		||||
# set up celery
 | 
			
		||||
app = celery.Celery(__name__)
 | 
			
		||||
 | 
			
		||||
broker = config.get('CELERY_BROKER_URL')
 | 
			
		||||
if broker[:4] == 'file':
 | 
			
		||||
    bq = tempfile.mkdtemp()
 | 
			
		||||
    bp = tempfile.mkdtemp()
 | 
			
		||||
    app.conf.update({
 | 
			
		||||
            'broker_url': broker,
 | 
			
		||||
            'broker_transport_options': {
 | 
			
		||||
                'data_folder_in': bq,
 | 
			
		||||
                'data_folder_out': bq,
 | 
			
		||||
                'data_folder_processed': bp,
 | 
			
		||||
            },
 | 
			
		||||
            },
 | 
			
		||||
            )
 | 
			
		||||
    logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
 | 
			
		||||
else:
 | 
			
		||||
    app.conf.update({
 | 
			
		||||
        'broker_url': broker,
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
result = config.get('CELERY_RESULT_URL')
 | 
			
		||||
if result[:4] == 'file':
 | 
			
		||||
    rq = tempfile.mkdtemp()
 | 
			
		||||
    app.conf.update({
 | 
			
		||||
        'result_backend': 'file://{}'.format(rq),
 | 
			
		||||
        })
 | 
			
		||||
    logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
 | 
			
		||||
else:
 | 
			
		||||
    app.conf.update({
 | 
			
		||||
        'result_backend': result,
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    a = Api()
 | 
			
		||||
    t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))
 | 
			
		||||
@ -1 +1 @@
 | 
			
		||||
cic_base[full_graph]~=0.1.2a46
 | 
			
		||||
cic_base[full_graph]~=0.1.2a61
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,6 @@
 | 
			
		||||
[metadata]
 | 
			
		||||
name = cic-notify
 | 
			
		||||
version= 0.4.0a2
 | 
			
		||||
version= 0.4.0a3
 | 
			
		||||
description = CIC notifications service
 | 
			
		||||
author = Louis Holbrook
 | 
			
		||||
author_email = dev@holbrook.no
 | 
			
		||||
@ -45,3 +45,4 @@ testing =
 | 
			
		||||
[options.entry_points]
 | 
			
		||||
console_scripts =
 | 
			
		||||
	cic-notify-tasker = cic_notify.runnable.tasker:main
 | 
			
		||||
	cic-notify-send = cic_notify.runnable.send:main
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user