Merge remote-tracking branch 'origin/master' into lash/meta-index-phone

This commit is contained in:
nolash 2021-04-07 08:31:27 +02:00
commit 80dc84eb3a
5 changed files with 151 additions and 20 deletions

View File

@ -0,0 +1,33 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@ -3,6 +3,7 @@ import logging
import re import re
# third-party imports # third-party imports
from celery.app.control import Inspect
import celery import celery
# local imports # local imports
@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?" sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api: class Api:
# TODO: Implement callback strategy # TODO: Implement callback strategy
def __init__(self, queue='cic-notify'): def __init__(self, queue='cic-notify'):
@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks :param queue: The queue on which to execute notification tasks
:type queue: str :type queue: str
""" """
registered_tasks = app.tasks self.sms_tasks = get_sms_queue_tasks(app)
self.sms_tasks = [] logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient): def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk """This function chains all sms tasks in order to send a message, log and persist said data to disk
@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task :rtype: Celery.Task
""" """
signatures = [] signatures = []
for task in self.sms_tasks: for q in self.sms_tasks:
signature = celery.signature(task) signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature) signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async( t = celery.group(signatures)()
args=[message, recipient],
queue=self.queue return t
)
return result

View File

@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46 cic_base[full_graph]~=0.1.2a61

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = cic-notify name = cic-notify
version= 0.4.0a2 version= 0.4.0a3
description = CIC notifications service description = CIC notifications service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@ -45,3 +45,4 @@ testing =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main