Compare commits

..

2 Commits

Author SHA1 Message Date
nolash
184885b35f Upgrade deps 2021-04-06 21:06:24 +02:00
nolash
5a4ce54add Add block time calc to retry, tracker 2021-04-06 21:01:50 +02:00
7 changed files with 22 additions and 153 deletions

View File

@@ -1,33 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@@ -3,7 +3,6 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -16,29 +15,6 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -46,9 +22,17 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
registered_tasks = app.tasks
self.sms_tasks = []
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -60,17 +44,12 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
for task in self.sms_tasks:
signature = celery.signature(task)
signatures.append(signature)
t = celery.group(signatures)()
return t
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result

View File

@@ -1,76 +0,0 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a61
cic_base[full_graph]~=0.1.2a46

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a3
version= 0.4.0a2
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,4 +45,3 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -88,7 +88,7 @@ signer = EIP155Signer(keystore)
nonce_oracle = RPCNonceOracle(signer_address, rpc)
registry = Registry(chain_spec)
registry = Registry()
o = registry.address_of(config.get('CIC_REGISTRY_ADDRESS'), 'AccountRegistry')
r = rpc.do(o)
account_registry_address = registry.parse_address_of(r)

View File

@@ -1,5 +1,5 @@
cic-base[full_graph]==0.1.2a61
sarafu-faucet==0.0.2a19
sarafu-faucet==0.0.2a17
cic-eth==0.11.0b1
cic-types==0.1.0a10
crypto-dev-signer==0.4.14a17