Cic notify build

This commit is contained in:
2021-02-07 02:29:24 +00:00
parent 85f06aa445
commit cebdb24ed6
61 changed files with 1650 additions and 27 deletions

View File

@@ -0,0 +1 @@
from .api import *

View File

@@ -0,0 +1,55 @@
# standard imports
import logging
import re
# third-party imports
import celery
# local imports
from cic_notify.tasks import sms
app = celery.current_app
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
"""
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
:param message: The message to be sent to the recipient.
:type message: str
:param recipient: The phone number of the recipient.
:type recipient: str
:return: a celery Task
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result

View File

@@ -0,0 +1,34 @@
# standard imports
import os
import logging
# local imports
from cic_notify.db.models.base import SessionBase
logg = logging.getLogger()
def dsn_from_config(config):
scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
dsn = ''
if config.get('DATABASE_ENGINE') == 'sqlite':
dsn = '{}:///{}'.format(
scheme,
config.get('DATABASE_NAME'),
)
else:
dsn = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.debug('parsed dsn from config: {}'.format(dsn))
return dsn

View File

@@ -0,0 +1,7 @@
import enum
class NotificationStatusEnum(enum.Enum):
UNKNOWN = 'UNKNOWN'
class NotificationTransportEnum(enum.Enum):
SMS = 'SMS'

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgres+psycopg2://postgres@localhost/cic-notify
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,77 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,48 @@
"""Notify sms log
Revision ID: b2aedf79b0b2
Revises:
Create Date: 2020-10-11 15:59:02.765157
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b2aedf79b0b2'
down_revision = None
branch_labels = None
depends_on = None
status_enum = sa.Enum(
'UNKNOWN', # the state of the message is not known
name='notification_status',
)
transport_enum = sa.Enum(
'SMS',
name='notification_transport',
)
def upgrade():
op.create_table('notification',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('transport', transport_enum, nullable=False),
sa.Column('status', status_enum, nullable=False),
sa.Column('status_code', sa.String(), nullable=True),
sa.Column('status_serial', sa.Integer(), nullable=False, server_default='0'),
sa.Column('recipient', sa.String(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('updated', sa.DateTime(), nullable=False),
sa.Column('message', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
)
op.create_index('notification_recipient_transport_idx', 'notification', ['transport', 'recipient'], schema=None, unique=False)
def downgrade():
op.drop_index('notification_recipient_transport_idx')
op.drop_table('notification')
status_enum.drop(op.get_bind(), checkfirst=False)
transport_enum.drop(op.get_bind(), checkfirst=False)

View File

@@ -0,0 +1,40 @@
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Model = declarative_base(name='Model')
class SessionBase(Model):
__abstract__ = True
id = Column(Integer, primary_key=True)
engine = None
session = None
query = None
@staticmethod
def create_session():
session = sessionmaker(bind=SessionBase.engine)
SessionBase.session = session()
return SessionBase.session
@staticmethod
def _set_engine(engine):
SessionBase.engine = engine
@staticmethod
def build():
Model.metadata.create_all(bind=SessionBase.engine)
@staticmethod
def connect(dsn):
e = create_engine(dsn)
SessionBase._set_engine(e)

View File

@@ -0,0 +1,27 @@
# standard imports
import datetime
# third-party imports
from sqlalchemy import Enum, Column, String, DateTime
# local imports
from .base import SessionBase
from ..enum import NotificationStatusEnum, NotificationTransportEnum
class Notification(SessionBase):
__tablename__ = 'notification'
transport = Column(Enum(NotificationTransportEnum))
status = Column(Enum(NotificationStatusEnum))
recipient = Column(String)
message = Column(String)
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow)
def __init__(self, transport, recipient, message, **kwargs):
super(Notification, self).__init__(**kwargs)
self.transport = transport
self.recipient = recipient
self.message = message
self.status = NotificationStatusEnum.UNKNOWN

View File

@@ -0,0 +1,11 @@
class NotInitializedError(Exception):
pass
class AlreadyInitializedError(Exception):
pass
class PleaseCommitFirstError(Exception):
"""Raised when there exists uncommitted changes in the code while trying to build out the package."""
pass

View File

@@ -0,0 +1,110 @@
# standard imports
import os
import logging
import importlib
import argparse
import tempfile
# third-party imports
import celery
import confini
# local imports
from cic_notify.db.models.base import SessionBase
from cic_notify.db import dsn_from_config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-notify', help='queue name for worker tasks')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
# verify database connection with minimal sanity query
session = SessionBase.create_session()
session.execute('select version_num from alembic_version')
session.close()
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
for key in config.store.keys():
if key[:5] == 'TASKS':
logg.info(f'adding sms task from {key}')
module = importlib.import_module(config.store[key])
if key == 'TASKS_AFRICASTALKING':
africastalking_notifier = module.AfricasTalkingNotifier
africastalking_notifier.initialize(
config.get('AFRICASTALKING_API_USERNAME'),
config.get('AFRICASTALKING_API_KEY'),
config.get('AFRICASTALKING_API_SENDER_ID')
)
def main():
argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q')
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1 @@
from . import sms

View File

@@ -0,0 +1,12 @@
# standard imports
# third-party imports
# local imports
import celery
celery_app = celery.current_app
from .africastalking import send
from .db import persist_notification
from .log import log

View File

@@ -0,0 +1,69 @@
# standard imports
import logging
# third party imports
import celery
import africastalking
# local imports
from cic_notify.error import NotInitializedError, AlreadyInitializedError
logg = logging.getLogger()
celery_app = celery.current_app
class AfricasTalkingNotifier:
initiated = None
sender_id = None
def __init__(self):
if not self.initiated:
raise NotInitializedError()
self.api_client = africastalking.SMS
@staticmethod
def initialize(api_username, api_key, sender_id=None):
"""
:param api_username:
:type api_username:
:param api_key:
:type api_key:
:param sender_id:
:type sender_id:
"""
if AfricasTalkingNotifier.initiated:
raise AlreadyInitializedError()
africastalking.initialize(username=api_username, api_key=api_key)
AfricasTalkingNotifier.sender_id = sender_id
AfricasTalkingNotifier.initiated = True
def send(self, message, recipient):
"""
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""
if self.sender_id:
response = self.api_client.send(message=message, recipients=[recipient], sender_id=self.sender_id)
logg.debug(f'Africastalking response sender-id {response}')
else:
response = self.api_client.send(message=message, recipients=[recipient])
logg.debug(f'africastalking response no-sender-id {response}')
@celery_app.task
def send(message, recipient):
"""
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""
africastalking_notifier = AfricasTalkingNotifier()
africastalking_notifier.send(message=message, recipient=recipient)

View File

@@ -0,0 +1,26 @@
# standard imports
# third-party imports
import celery
# local imports
from cic_notify.db.models.notification import Notification
from cic_notify.db.enum import NotificationTransportEnum
celery_app = celery.current_app
@celery_app.task
def persist_notification(recipient, message):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:return:
:rtype:
"""
Notification.create_session()
notification = Notification(transport=NotificationTransportEnum.SMS, recipient=recipient, message=message)
Notification.session.add(notification)
Notification.session.commit()

View File

@@ -0,0 +1,26 @@
# standard imports
import logging
import time
# third-party imports
import celery
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
local_logg = logging.getLogger(__name__)
@celery_app.task
def log(recipient, message):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:return:
:rtype:
"""
timestamp = time.time()
log_string = f'[{timestamp}] {__name__} message to {recipient}: {message}'
logg.info(log_string)
local_logg.info(log_string)

View File

@@ -0,0 +1,47 @@
# standard imports
import logging
import time
# third-party imports
import semver
# local imports
from cic_notify.error import PleaseCommitFirstError
logg = logging.getLogger()
version = (0, 4, 0, 'alpha.2')
version_object = semver.VersionInfo(
major=version[0],
minor=version[1],
patch=version[2],
prerelease=version[3],
)
version_string = str(version_object)
def git_hash():
import subprocess
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
if len(git_diff.stdout) > 0:
raise PleaseCommitFirstError()
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
return git_hash_brief
try:
version_git = git_hash()
version_string += '.build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(
time_string_pair[0],
int(time_string_pair[1]),
)
logg.info(f'Final version string will be {version_string}')
__version_string__ = version_string